http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java new file mode 100644 index 0000000..246565d --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.core; + +import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.codehaus.jackson.map.AnnotationIntrospector; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectReader; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.TreeMap; + +public class MetricsCollectorInterface implements Serializable { + + private static String hostName = null; + private String instanceId = null; + public final static String serviceName = "anomaly-engine"; + private String collectorHost; + private String protocol; + private String port; + private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics"; + private static final Log LOG = LogFactory.getLog(MetricsCollectorInterface.class); + private static ObjectMapper mapper; + private final static ObjectReader timelineObjectReader; + + static { + mapper = new ObjectMapper(); + AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); + mapper.setAnnotationIntrospector(introspector); + mapper.getSerializationConfig() + .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); + timelineObjectReader = mapper.reader(TimelineMetrics.class); + } + + public MetricsCollectorInterface(String collectorHost, String protocol, String port) { + this.collectorHost = collectorHost; + this.protocol = protocol; + this.port = port; + this.hostName = getDefaultLocalHostName(); + } + + public static String getDefaultLocalHostName() { + + if (hostName != null) { + return hostName; + } + + try { + return InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + LOG.info("Error getting host address"); + } + return null; + } + + public void publish(List<MetricAnomaly> metricAnomalies) { + if (CollectionUtils.isNotEmpty(metricAnomalies)) { + LOG.info("Sending metric anomalies of size : " + metricAnomalies.size()); + List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies); + if (!metricList.isEmpty()) { + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.setMetrics(metricList); + emitMetrics(timelineMetrics); + } + } else { + LOG.debug("No anomalies to send."); + } + } + + private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) { + List<TimelineMetric> metrics = new ArrayList<>(); + + if (metricAnomalies.isEmpty()) { + return metrics; + } + + for (MetricAnomaly anomaly : metricAnomalies) { + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(anomaly.getMetricKey()); + timelineMetric.setAppId(serviceName + "-" + anomaly.getMethodType()); + timelineMetric.setInstanceId(null); + timelineMetric.setHostName(getDefaultLocalHostName()); + timelineMetric.setStartTime(anomaly.getTimestamp()); + HashMap<String, String> metadata = new HashMap<>(); + metadata.put("method", anomaly.getMethodType()); + metadata.put("anomaly-score", String.valueOf(anomaly.getAnomalyScore())); + timelineMetric.setMetadata(metadata); + TreeMap<Long,Double> metricValues = new TreeMap<>(); + metricValues.put(anomaly.getTimestamp(), anomaly.getMetricValue()); + timelineMetric.setMetricValues(metricValues); + + metrics.add(timelineMetric); + } + return metrics; + } + + public boolean emitMetrics(TimelineMetrics metrics) { + String connectUrl = constructTimelineMetricUri(); + String jsonData = null; + LOG.debug("EmitMetrics connectUrl = " + connectUrl); + try { + jsonData = mapper.writeValueAsString(metrics); + LOG.info(jsonData); + } catch (IOException e) { + LOG.error("Unable to parse metrics", e); + } + if (jsonData != null) { + return emitMetricsJson(connectUrl, jsonData); + } + return false; + } + + private HttpURLConnection getConnection(String spec) throws IOException { + return (HttpURLConnection) new URL(spec).openConnection(); + } + + private boolean emitMetricsJson(String connectUrl, String jsonData) { + int timeout = 10000; + HttpURLConnection connection = null; + try { + if (connectUrl == null) { + throw new IOException("Unknown URL. Unable to connect to metrics collector."); + } + connection = getConnection(connectUrl); + + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setRequestProperty("Connection", "Keep-Alive"); + connection.setConnectTimeout(timeout); + connection.setReadTimeout(timeout); + connection.setDoOutput(true); + + if (jsonData != null) { + try (OutputStream os = connection.getOutputStream()) { + os.write(jsonData.getBytes("UTF-8")); + } + } + + int statusCode = connection.getResponseCode(); + + if (statusCode != 200) { + LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " + + "statusCode = " + statusCode); + } else { + LOG.info("Metrics posted to Collector " + connectUrl); + } + return true; + } catch (IOException ioe) { + LOG.error(ioe.getMessage()); + } + return false; + } + + private String constructTimelineMetricUri() { + StringBuilder sb = new StringBuilder(protocol); + sb.append("://"); + sb.append(collectorHost); + sb.append(":"); + sb.append(port); + sb.append(WS_V1_TIMELINE_METRICS); + return sb.toString(); + } + + public TimelineMetrics fetchMetrics(String metricName, + String appId, + String hostname, + long startime, + long endtime) { + + String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId + + "&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime; + LOG.debug("Fetch metrics URL : " + url); + + URL obj = null; + BufferedReader in = null; + TimelineMetrics timelineMetrics = new TimelineMetrics(); + + try { + obj = new URL(url); + HttpURLConnection con = (HttpURLConnection) obj.openConnection(); + con.setRequestMethod("GET"); + int responseCode = con.getResponseCode(); + LOG.debug("Sending 'GET' request to URL : " + url); + LOG.debug("Response Code : " + responseCode); + + in = new BufferedReader( + new InputStreamReader(con.getInputStream())); + timelineMetrics = timelineObjectReader.readValue(in); + } catch (Exception e) { + LOG.error(e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + LOG.warn(e); + } + } + } + + LOG.info("Fetched " + timelineMetrics.getMetrics().size() + " metrics."); + return timelineMetrics; + } +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java new file mode 100644 index 0000000..c579515 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.core; + +import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet; +import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaModel; +import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +public class PointInTimeADSystem implements Serializable { + + //private EmaTechnique emaTechnique; + private MetricsCollectorInterface metricsCollectorInterface; + private Map<String, Double> tukeysNMap; + private double defaultTukeysN = 3; + + private long testIntervalMillis = 5*60*1000; //10mins + private long trainIntervalMillis = 15*60*1000; //1hour + + private static final Log LOG = LogFactory.getLog(PointInTimeADSystem.class); + + private AmbariServerInterface ambariServerInterface; + private int sensitivity = 50; + private int minSensitivity = 0; + private int maxSensitivity = 100; + + public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN, + long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) { + this.metricsCollectorInterface = metricsCollectorInterface; + this.defaultTukeysN = defaultTukeysN; + this.tukeysNMap = new HashMap<>(); + this.testIntervalMillis = testIntervalMillis; + this.trainIntervalMillis = trainIntervalMillis; + this.ambariServerInterface = new AmbariServerInterface(ambariServerHost, clusterName); + LOG.info("Starting PointInTimeADSystem..."); + } + + public void runTukeysAndRefineEma(EmaTechnique emaTechnique, long startTime) { + LOG.info("Running Tukeys for test data interval [" + new Date(startTime - testIntervalMillis) + " : " + new Date(startTime) + "], with train data period [" + new Date(startTime - testIntervalMillis - trainIntervalMillis) + " : " + new Date(startTime - testIntervalMillis) + "]"); + + int requiredSensivity = ambariServerInterface.getPointInTimeSensitivity(); + if (requiredSensivity == -1 || requiredSensivity == sensitivity) { + LOG.info("No change in sensitivity needed."); + } else { + LOG.info("Current tukey's N value = " + defaultTukeysN); + if (requiredSensivity > sensitivity) { + int targetSensitivity = Math.min(maxSensitivity, requiredSensivity); + while (sensitivity < targetSensitivity) { + defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.05; + sensitivity++; + } + } else { + int targetSensitivity = Math.max(minSensitivity, requiredSensivity); + while (sensitivity > targetSensitivity) { + defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.05; + sensitivity--; + } + } + LOG.info("New tukey's N value = " + defaultTukeysN); + } + + TimelineMetrics timelineMetrics = new TimelineMetrics(); + for (String metricKey : emaTechnique.getTrackedEmas().keySet()) { + LOG.info("EMA key = " + metricKey); + EmaModel emaModel = emaTechnique.getTrackedEmas().get(metricKey); + String metricName = emaModel.getMetricName(); + String appId = emaModel.getAppId(); + String hostname = emaModel.getHostname(); + + TimelineMetrics tukeysData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, startTime - (testIntervalMillis + trainIntervalMillis), + startTime); + + if (tukeysData.getMetrics().isEmpty()) { + LOG.info("No metrics fetched for Tukeys, metricKey = " + metricKey); + continue; + } + + List<Double> trainTsList = new ArrayList<>(); + List<Double> trainDataList = new ArrayList<>(); + List<Double> testTsList = new ArrayList<>(); + List<Double> testDataList = new ArrayList<>(); + + for (TimelineMetric metric : tukeysData.getMetrics()) { + for (Long timestamp : metric.getMetricValues().keySet()) { + if (timestamp <= (startTime - testIntervalMillis)) { + trainDataList.add(metric.getMetricValues().get(timestamp)); + trainTsList.add((double)timestamp); + } else { + testDataList.add(metric.getMetricValues().get(timestamp)); + testTsList.add((double)timestamp); + } + } + } + + if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { + LOG.info("Not enough train/test data to perform analysis."); + continue; + } + + String tukeysTrainSeries = "tukeysTrainSeries"; + double[] trainTs = new double[trainTsList.size()]; + double[] trainData = new double[trainTsList.size()]; + for (int i = 0; i < trainTs.length; i++) { + trainTs[i] = trainTsList.get(i); + trainData[i] = trainDataList.get(i); + } + + String tukeysTestSeries = "tukeysTestSeries"; + double[] testTs = new double[testTsList.size()]; + double[] testData = new double[testTsList.size()]; + for (int i = 0; i < testTs.length; i++) { + testTs[i] = testTsList.get(i); + testData[i] = testDataList.get(i); + } + + LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length); + + DataSeries tukeysTrainData = new DataSeries(tukeysTrainSeries, trainTs, trainData); + DataSeries tukeysTestData = new DataSeries(tukeysTestSeries, testTs, testData); + + if (!tukeysNMap.containsKey(metricKey)) { + tukeysNMap.put(metricKey, defaultTukeysN); + } + + Map<String, String> configs = new HashMap<>(); + configs.put("tukeys.n", String.valueOf(tukeysNMap.get(metricKey))); + + ResultSet rs = RFunctionInvoker.tukeys(tukeysTrainData, tukeysTestData, configs); + + List<TimelineMetric> tukeysMetrics = getAsTimelineMetric(rs, metricName, appId, hostname); + LOG.info("Tukeys anomalies size : " + tukeysMetrics.size()); + TreeMap<Long, Double> tukeysMetricValues = new TreeMap<>(); + + for (TimelineMetric tukeysMetric : tukeysMetrics) { + tukeysMetricValues.putAll(tukeysMetric.getMetricValues()); + timelineMetrics.addOrMergeTimelineMetric(tukeysMetric); + } + + TimelineMetrics emaData = metricsCollectorInterface.fetchMetrics(metricKey, MetricsCollectorInterface.serviceName+"-ema", MetricsCollectorInterface.getDefaultLocalHostName(), startTime - testIntervalMillis, startTime); + TreeMap<Long, Double> emaMetricValues = new TreeMap(); + if (!emaData.getMetrics().isEmpty()) { + emaMetricValues = emaData.getMetrics().get(0).getMetricValues(); + } + + LOG.info("Ema anomalies size : " + emaMetricValues.size()); + int tp = 0; + int tn = 0; + int fp = 0; + int fn = 0; + + for (double ts : testTs) { + long timestamp = (long) ts; + if (tukeysMetricValues.containsKey(timestamp)) { + if (emaMetricValues.containsKey(timestamp)) { + tp++; + } else { + fn++; + } + } else { + if (emaMetricValues.containsKey(timestamp)) { + fp++; + } else { + tn++; + } + } + } + + double recall = (double) tp / (double) (tp + fn); + double precision = (double) tp / (double) (tp + fp); + LOG.info("----------------------------"); + LOG.info("Precision Recall values for " + metricKey); + LOG.info("tp=" + tp + ", fp=" + fp + ", tn=" + tn + ", fn=" + fn); + LOG.info("----------------------------"); + + if (recall < 0.5) { + LOG.info("Increasing EMA sensitivity by 10%"); + emaModel.updateModel(true, 5); + } else if (precision < 0.5) { + LOG.info("Decreasing EMA sensitivity by 10%"); + emaModel.updateModel(false, 5); + } + + } + + if (emaTechnique.getTrackedEmas().isEmpty()){ + LOG.info("No EMA Technique keys tracked!!!!"); + } + + if (!timelineMetrics.getMetrics().isEmpty()) { + metricsCollectorInterface.emitMetrics(timelineMetrics); + } + } + + private static List<TimelineMetric> getAsTimelineMetric(ResultSet result, String metricName, String appId, String hostname) { + + List<TimelineMetric> timelineMetrics = new ArrayList<>(); + + if (result == null) { + LOG.info("ResultSet from R call is null!!"); + return null; + } + + if (result.resultset.size() > 0) { + double[] ts = result.resultset.get(0); + double[] metrics = result.resultset.get(1); + double[] anomalyScore = result.resultset.get(2); + for (int i = 0; i < ts.length; i++) { + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(metricName + ":" + appId + ":" + hostname); + timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName()); + timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys"); + timelineMetric.setInstanceId(null); + timelineMetric.setStartTime((long) ts[i]); + TreeMap<Long, Double> metricValues = new TreeMap<>(); + metricValues.put((long) ts[i], metrics[i]); + + HashMap<String, String> metadata = new HashMap<>(); + metadata.put("method", "tukeys"); + if (String.valueOf(anomalyScore[i]).equals("infinity")) { + LOG.info("Got anomalyScore = infinity for " + metricName + ":" + appId + ":" + hostname); + } else { + metadata.put("anomaly-score", String.valueOf(anomalyScore[i])); + } + timelineMetric.setMetadata(metadata); + + timelineMetric.setMetricValues(metricValues); + timelineMetrics.add(timelineMetric); + } + } + + return timelineMetrics; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java new file mode 100644 index 0000000..4538f0b --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.core; + + +import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet; +import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.rosuda.JRI.REXP; +import org.rosuda.JRI.RVector; +import org.rosuda.JRI.Rengine; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class RFunctionInvoker { + + static final Log LOG = LogFactory.getLog(RFunctionInvoker.class); + public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null); + private static String rScriptDir = "/usr/lib/ambari-metrics-collector/R-scripts"; + + private static void loadDataSets(Rengine r, DataSeries trainData, DataSeries testData) { + r.assign("train_ts", trainData.ts); + r.assign("train_x", trainData.values); + r.eval("train_data <- data.frame(train_ts,train_x)"); + r.eval("names(train_data) <- c(\"TS\", " + trainData.seriesName + ")"); + + r.assign("test_ts", testData.ts); + r.assign("test_x", testData.values); + r.eval("test_data <- data.frame(test_ts,test_x)"); + r.eval("names(test_data) <- c(\"TS\", " + testData.seriesName + ")"); + } + + public static void setScriptsDir(String dir) { + rScriptDir = dir; + } + + public static ResultSet executeMethod(String methodType, DataSeries trainData, DataSeries testData, Map<String, String> configs) { + + ResultSet result; + switch (methodType) { + case "tukeys": + result = tukeys(trainData, testData, configs); + break; + case "ema": + result = ema_global(trainData, testData, configs); + break; + case "ks": + result = ksTest(trainData, testData, configs); + break; + case "hsdev": + result = hsdev(trainData, testData, configs); + break; + default: + result = tukeys(trainData, testData, configs); + break; + } + return result; + } + + public static ResultSet tukeys(DataSeries trainData, DataSeries testData, Map<String, String> configs) { + try { + + REXP exp1 = r.eval("source('" + rScriptDir + "/tukeys.r" + "')"); + + double n = Double.parseDouble(configs.get("tukeys.n")); + r.eval("n <- " + n); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ams_tukeys(train_data, test_data, n)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i < cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + } catch (Exception e) { + LOG.error(e); + } finally { + r.end(); + } + return null; + } + + public static ResultSet ema_global(DataSeries trainData, DataSeries testData, Map<String, String> configs) { + try { + r.eval("source('" + rScriptDir + "/ema.r" + "')"); + + int n = Integer.parseInt(configs.get("ema.n")); + r.eval("n <- " + n); + + double w = Double.parseDouble(configs.get("ema.w")); + r.eval("w <- " + w); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ema_global(train_data, test_data, w, n)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i < cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + + } catch (Exception e) { + LOG.error(e); + } finally { + r.end(); + } + return null; + } + + public static ResultSet ema_daily(DataSeries trainData, DataSeries testData, Map<String, String> configs) { + try { + r.eval("source('" + rScriptDir + "/ema.r" + "')"); + + int n = Integer.parseInt(configs.get("ema.n")); + r.eval("n <- " + n); + + double w = Double.parseDouble(configs.get("ema.w")); + r.eval("w <- " + w); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ema_daily(train_data, test_data, w, n)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i < cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + + } catch (Exception e) { + LOG.error(e); + } finally { + r.end(); + } + return null; + } + + public static ResultSet ksTest(DataSeries trainData, DataSeries testData, Map<String, String> configs) { + try { + r.eval("source('" + rScriptDir + "/kstest.r" + "')"); + + double p_value = Double.parseDouble(configs.get("ks.p_value")); + r.eval("p_value <- " + p_value); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ams_ks(train_data, test_data, p_value)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i < cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + + } catch (Exception e) { + LOG.error(e); + } finally { + r.end(); + } + return null; + } + + public static ResultSet hsdev(DataSeries trainData, DataSeries testData, Map<String, String> configs) { + try { + r.eval("source('" + rScriptDir + "/hsdev.r" + "')"); + + int n = Integer.parseInt(configs.get("hsdev.n")); + r.eval("n <- " + n); + + int nhp = Integer.parseInt(configs.get("hsdev.nhp")); + r.eval("nhp <- " + nhp); + + long interval = Long.parseLong(configs.get("hsdev.interval")); + r.eval("interval <- " + interval); + + long period = Long.parseLong(configs.get("hsdev.period")); + r.eval("period <- " + period); + + loadDataSets(r, trainData, testData); + + r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)"); + REXP exp = r.eval("an2"); + RVector cont = (RVector) exp.getContent(); + + List<double[]> result = new ArrayList(); + for (int i = 0; i < cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + } catch (Exception e) { + LOG.error(e); + } finally { + r.end(); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java new file mode 100644 index 0000000..2a205d1 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.core; + +import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; +import org.apache.ambari.metrics.alertservice.prototype.methods.hsdev.HsdevTechnique; +import org.apache.ambari.metrics.alertservice.prototype.methods.kstest.KSTechnique; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +public class TrendADSystem implements Serializable { + + private MetricsCollectorInterface metricsCollectorInterface; + private List<TrendMetric> trendMetrics; + + private long ksTestIntervalMillis = 10 * 60 * 1000; + private long ksTrainIntervalMillis = 10 * 60 * 1000; + private KSTechnique ksTechnique; + + private HsdevTechnique hsdevTechnique; + private int hsdevNumHistoricalPeriods = 3; + + private Map<KsSingleRunKey, MetricAnomaly> trackedKsAnomalies = new HashMap<>(); + private static final Log LOG = LogFactory.getLog(TrendADSystem.class); + private String inputFile = ""; + + public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface, + long ksTestIntervalMillis, + long ksTrainIntervalMillis, + int hsdevNumHistoricalPeriods) { + + this.metricsCollectorInterface = metricsCollectorInterface; + this.ksTestIntervalMillis = ksTestIntervalMillis; + this.ksTrainIntervalMillis = ksTrainIntervalMillis; + this.hsdevNumHistoricalPeriods = hsdevNumHistoricalPeriods; + + this.ksTechnique = new KSTechnique(); + this.hsdevTechnique = new HsdevTechnique(); + + trendMetrics = new ArrayList<>(); + } + + public void runKSTest(long currentEndTime, Set<TrendMetric> trendMetrics) { + readInputFile(inputFile); + + long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis; + LOG.info("Running KS Test for test data interval [" + new Date(ksTestIntervalStartTime) + " : " + + new Date(currentEndTime) + "], with train data period [" + new Date(ksTestIntervalStartTime - ksTrainIntervalMillis) + + " : " + new Date(ksTestIntervalStartTime) + "]"); + + for (TrendMetric metric : trendMetrics) { + String metricName = metric.metricName; + String appId = metric.appId; + String hostname = metric.hostname; + String key = metricName + ":" + appId + ":" + hostname; + + TimelineMetrics ksData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, ksTestIntervalStartTime - ksTrainIntervalMillis, + currentEndTime); + + if (ksData.getMetrics().isEmpty()) { + LOG.info("No metrics fetched for KS, metricKey = " + key); + continue; + } + + List<Double> trainTsList = new ArrayList<>(); + List<Double> trainDataList = new ArrayList<>(); + List<Double> testTsList = new ArrayList<>(); + List<Double> testDataList = new ArrayList<>(); + + for (TimelineMetric timelineMetric : ksData.getMetrics()) { + for (Long timestamp : timelineMetric.getMetricValues().keySet()) { + if (timestamp <= ksTestIntervalStartTime) { + trainDataList.add(timelineMetric.getMetricValues().get(timestamp)); + trainTsList.add((double) timestamp); + } else { + testDataList.add(timelineMetric.getMetricValues().get(timestamp)); + testTsList.add((double) timestamp); + } + } + } + + LOG.info("Train Data size : " + trainDataList.size() + ", Test Data Size : " + testDataList.size()); + if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { + LOG.info("Not enough train/test data to perform KS analysis."); + continue; + } + + String ksTrainSeries = "KSTrainSeries"; + double[] trainTs = new double[trainTsList.size()]; + double[] trainData = new double[trainTsList.size()]; + for (int i = 0; i < trainTs.length; i++) { + trainTs[i] = trainTsList.get(i); + trainData[i] = trainDataList.get(i); + } + + String ksTestSeries = "KSTestSeries"; + double[] testTs = new double[testTsList.size()]; + double[] testData = new double[testTsList.size()]; + for (int i = 0; i < testTs.length; i++) { + testTs[i] = testTsList.get(i); + testData[i] = testDataList.get(i); + } + + LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length); + + DataSeries ksTrainData = new DataSeries(ksTrainSeries, trainTs, trainData); + DataSeries ksTestData = new DataSeries(ksTestSeries, testTs, testData); + + MetricAnomaly metricAnomaly = ksTechnique.runKsTest(key, ksTrainData, ksTestData); + if (metricAnomaly == null) { + LOG.info("No anomaly from KS test."); + } else { + LOG.info("Found Anomaly in KS Test. Publishing KS Anomaly metric...."); + TimelineMetric timelineMetric = getAsTimelineMetric(metricAnomaly, + ksTestIntervalStartTime, currentEndTime, ksTestIntervalStartTime - ksTrainIntervalMillis, ksTestIntervalStartTime); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.addOrMergeTimelineMetric(timelineMetric); + metricsCollectorInterface.emitMetrics(timelineMetrics); + + trackedKsAnomalies.put(new KsSingleRunKey(ksTestIntervalStartTime, currentEndTime, metricName, appId, hostname), metricAnomaly); + } + } + + if (trendMetrics.isEmpty()) { + LOG.info("No Trend metrics tracked!!!!"); + } + + } + + private TimelineMetric getAsTimelineMetric(MetricAnomaly metricAnomaly, + long testStart, + long testEnd, + long trainStart, + long trainEnd) { + + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(metricAnomaly.getMetricKey()); + timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-" + metricAnomaly.getMethodType()); + timelineMetric.setInstanceId(null); + timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName()); + timelineMetric.setStartTime(testEnd); + HashMap<String, String> metadata = new HashMap<>(); + metadata.put("method", metricAnomaly.getMethodType()); + metadata.put("anomaly-score", String.valueOf(metricAnomaly.getAnomalyScore())); + metadata.put("test-start-time", String.valueOf(testStart)); + metadata.put("train-start-time", String.valueOf(trainStart)); + metadata.put("train-end-time", String.valueOf(trainEnd)); + timelineMetric.setMetadata(metadata); + TreeMap<Long,Double> metricValues = new TreeMap<>(); + metricValues.put(testEnd, metricAnomaly.getMetricValue()); + timelineMetric.setMetricValues(metricValues); + return timelineMetric; + + } + + public void runHsdevMethod() { + + List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>(); + + for (KsSingleRunKey ksSingleRunKey : trackedKsAnomalies.keySet()) { + + long hsdevTestEnd = ksSingleRunKey.endTime; + long hsdevTestStart = ksSingleRunKey.startTime; + + long period = hsdevTestEnd - hsdevTestStart; + + long hsdevTrainStart = hsdevTestStart - (hsdevNumHistoricalPeriods) * period; + long hsdevTrainEnd = hsdevTestStart; + + LOG.info("Running HSdev Test for test data interval [" + new Date(hsdevTestStart) + " : " + + new Date(hsdevTestEnd) + "], with train data period [" + new Date(hsdevTrainStart) + + " : " + new Date(hsdevTrainEnd) + "]"); + + String metricName = ksSingleRunKey.metricName; + String appId = ksSingleRunKey.appId; + String hostname = ksSingleRunKey.hostname; + String key = metricName + "_" + appId + "_" + hostname; + + TimelineMetrics hsdevData = metricsCollectorInterface.fetchMetrics( + metricName, + appId, + hostname, + hsdevTrainStart, + hsdevTestEnd); + + if (hsdevData.getMetrics().isEmpty()) { + LOG.info("No metrics fetched for HSDev, metricKey = " + key); + continue; + } + + List<Double> trainTsList = new ArrayList<>(); + List<Double> trainDataList = new ArrayList<>(); + List<Double> testTsList = new ArrayList<>(); + List<Double> testDataList = new ArrayList<>(); + + for (TimelineMetric timelineMetric : hsdevData.getMetrics()) { + for (Long timestamp : timelineMetric.getMetricValues().keySet()) { + if (timestamp <= hsdevTestStart) { + trainDataList.add(timelineMetric.getMetricValues().get(timestamp)); + trainTsList.add((double) timestamp); + } else { + testDataList.add(timelineMetric.getMetricValues().get(timestamp)); + testTsList.add((double) timestamp); + } + } + } + + if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { + LOG.info("Not enough train/test data to perform Hsdev analysis."); + continue; + } + + String hsdevTrainSeries = "HsdevTrainSeries"; + double[] trainTs = new double[trainTsList.size()]; + double[] trainData = new double[trainTsList.size()]; + for (int i = 0; i < trainTs.length; i++) { + trainTs[i] = trainTsList.get(i); + trainData[i] = trainDataList.get(i); + } + + String hsdevTestSeries = "HsdevTestSeries"; + double[] testTs = new double[testTsList.size()]; + double[] testData = new double[testTsList.size()]; + for (int i = 0; i < testTs.length; i++) { + testTs[i] = testTsList.get(i); + testData[i] = testDataList.get(i); + } + + LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length); + + DataSeries hsdevTrainData = new DataSeries(hsdevTrainSeries, trainTs, trainData); + DataSeries hsdevTestData = new DataSeries(hsdevTestSeries, testTs, testData); + + MetricAnomaly metricAnomaly = hsdevTechnique.runHsdevTest(key, hsdevTrainData, hsdevTestData); + if (metricAnomaly == null) { + LOG.info("No anomaly from Hsdev test. Mismatch between KS and HSDev. "); + ksTechnique.updateModel(key, false, 10); + } else { + LOG.info("Found Anomaly in Hsdev Test. This confirms KS anomaly."); + hsdevMetricAnomalies.add(getAsTimelineMetric(metricAnomaly, + hsdevTestStart, hsdevTestEnd, hsdevTrainStart, hsdevTrainEnd)); + } + } + clearTrackedKsRunKeys(); + + if (!hsdevMetricAnomalies.isEmpty()) { + LOG.info("Publishing Hsdev Anomalies...."); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.setMetrics(hsdevMetricAnomalies); + metricsCollectorInterface.emitMetrics(timelineMetrics); + } + } + + private void clearTrackedKsRunKeys() { + trackedKsAnomalies.clear(); + } + + private void readInputFile(String fileName) { + trendMetrics.clear(); + try (BufferedReader br = new BufferedReader(new FileReader(fileName))) { + for (String line; (line = br.readLine()) != null; ) { + String[] splits = line.split(","); + LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]); + trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2])); + } + } catch (IOException e) { + LOG.error("Error reading input file : " + e); + } + } + + class KsSingleRunKey implements Serializable{ + + long startTime; + long endTime; + String metricName; + String appId; + String hostname; + + public KsSingleRunKey(long startTime, long endTime, String metricName, String appId, String hostname) { + this.startTime = startTime; + this.endTime = endTime; + this.metricName = metricName; + this.appId = appId; + this.hostname = hostname; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java new file mode 100644 index 0000000..0640142 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.core; + +import java.io.Serializable; + +public class TrendMetric implements Serializable { + + String metricName; + String appId; + String hostname; + + public TrendMetric(String metricName, String appId, String hostname) { + this.metricName = metricName; + this.appId = appId; + this.hostname = hostname; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java new file mode 100644 index 0000000..0b10b4b --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.methods; + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; + +import java.sql.Time; +import java.util.List; +import java.util.Map; + +public abstract class AnomalyDetectionTechnique { + + protected String methodType; + + public abstract List<MetricAnomaly> test(TimelineMetric metric); + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java new file mode 100644 index 0000000..da4f030 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.methods; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +public class MetricAnomaly implements Serializable{ + + private String methodType; + private double anomalyScore; + private String metricKey; + private long timestamp; + private double metricValue; + + + public MetricAnomaly(String metricKey, long timestamp, double metricValue, String methodType, double anomalyScore) { + this.metricKey = metricKey; + this.timestamp = timestamp; + this.metricValue = metricValue; + this.methodType = methodType; + this.anomalyScore = anomalyScore; + + } + + public String getMethodType() { + return methodType; + } + + public void setMethodType(String methodType) { + this.methodType = methodType; + } + + public double getAnomalyScore() { + return anomalyScore; + } + + public void setAnomalyScore(double anomalyScore) { + this.anomalyScore = anomalyScore; + } + + public void setMetricKey(String metricKey) { + this.metricKey = metricKey; + } + + public String getMetricKey() { + return metricKey; + } + + public void setMetricName(String metricName) { + this.metricKey = metricName; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public double getMetricValue() { + return metricValue; + } + + public void setMetricValue(double metricValue) { + this.metricValue = metricValue; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java new file mode 100644 index 0000000..a31410d --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.methods.ema; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; + +import static org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique.suppressAnomaliesTheshold; + +@XmlRootElement +public class EmaModel implements Serializable { + + private String metricName; + private String hostname; + private String appId; + private double ema; + private double ems; + private double weight; + private double timessdev; + + private int ctr = 0; + + private static final Log LOG = LogFactory.getLog(EmaModel.class); + + public EmaModel(String name, String hostname, String appId, double weight, double timessdev) { + this.metricName = name; + this.hostname = hostname; + this.appId = appId; + this.weight = weight; + this.timessdev = timessdev; + this.ema = 0.0; + this.ems = 0.0; + } + + public String getMetricName() { + return metricName; + } + + public String getHostname() { + return hostname; + } + + public String getAppId() { + return appId; + } + + public double testAndUpdate(double metricValue) { + + double anomalyScore = 0.0; + LOG.info("Before Update ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + ", timessdev = " + timessdev); + update(metricValue); + if (ctr > suppressAnomaliesTheshold) { + anomalyScore = test(metricValue); + if (anomalyScore > 0.0) { + LOG.info("Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + + ", timessdev = " + timessdev + ", metricValue = " + metricValue); + } else { + LOG.info("Not an Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + + ", timessdev = " + timessdev + ", metricValue = " + metricValue); + } + } else { + ctr++; + if (ctr > suppressAnomaliesTheshold) { + LOG.info("Ema Model for " + metricName + ":" + appId + ":" + hostname + " is ready for testing data."); + } + } + return anomalyScore; + } + + public void update(double metricValue) { + ema = weight * ema + (1 - weight) * metricValue; + ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0)); + LOG.debug("In update : ema = " + ema + ", ems = " + ems); + } + + public double test(double metricValue) { + LOG.debug("In test : ema = " + ema + ", ems = " + ems); + double diff = Math.abs(ema - metricValue) - (timessdev * ems); + LOG.debug("diff = " + diff); + if (diff > 0) { + return Math.abs((metricValue - ema) / ems); //Z score + } else { + return 0.0; + } + } + + public void updateModel(boolean increaseSensitivity, double percent) { + LOG.info("Updating model for " + metricName + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent); + double delta = percent / 100; + if (increaseSensitivity) { + delta = delta * -1; + } + this.timessdev = timessdev + delta * timessdev; + //this.weight = Math.min(1.0, weight + delta * weight); + LOG.info("New model parameters " + metricName + " : timessdev = " + timessdev + ", weight = " + weight); + } + + public double getWeight() { + return weight; + } + + public void setWeight(double weight) { + this.weight = weight; + } + + public double getTimessdev() { + return timessdev; + } + + public void setTimessdev(double timessdev) { + this.timessdev = timessdev; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java new file mode 100644 index 0000000..62749c1 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.methods.ema; + +import com.google.gson.Gson; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.spark.SparkContext; +import org.apache.spark.mllib.util.Loader; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; + +public class EmaModelLoader implements Loader<EmaTechnique> { + private static final Log LOG = LogFactory.getLog(EmaModelLoader.class); + + @Override + public EmaTechnique load(SparkContext sc, String path) { + return new EmaTechnique(0.5,3); +// Gson gson = new Gson(); +// try { +// String fileString = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8); +// return gson.fromJson(fileString, EmaTechnique.class); +// } catch (IOException e) { +// LOG.error(e); +// } +// return null; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java new file mode 100644 index 0000000..52c6cf3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.methods.ema; + +import com.google.gson.Gson; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; +import org.apache.ambari.metrics.alertservice.prototype.methods.AnomalyDetectionTechnique; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.spark.SparkContext; +import org.apache.spark.mllib.util.Saveable; + +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.BufferedWriter; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.io.Writer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@XmlRootElement +public class EmaTechnique extends AnomalyDetectionTechnique implements Serializable, Saveable { + + @XmlElement(name = "trackedEmas") + private Map<String, EmaModel> trackedEmas; + private static final Log LOG = LogFactory.getLog(EmaTechnique.class); + + private double startingWeight = 0.5; + private double startTimesSdev = 3.0; + private String methodType = "ema"; + public static int suppressAnomaliesTheshold = 100; + + public EmaTechnique(double startingWeight, double startTimesSdev, int suppressAnomaliesTheshold) { + trackedEmas = new HashMap<>(); + this.startingWeight = startingWeight; + this.startTimesSdev = startTimesSdev; + EmaTechnique.suppressAnomaliesTheshold = suppressAnomaliesTheshold; + LOG.info("New EmaTechnique......"); + } + + public EmaTechnique(double startingWeight, double startTimesSdev) { + trackedEmas = new HashMap<>(); + this.startingWeight = startingWeight; + this.startTimesSdev = startTimesSdev; + LOG.info("New EmaTechnique......"); + } + + public List<MetricAnomaly> test(TimelineMetric metric) { + String metricName = metric.getMetricName(); + String appId = metric.getAppId(); + String hostname = metric.getHostName(); + String key = metricName + ":" + appId + ":" + hostname; + + EmaModel emaModel = trackedEmas.get(key); + if (emaModel == null) { + LOG.debug("EmaModel not present for " + key); + LOG.debug("Number of tracked Emas : " + trackedEmas.size()); + emaModel = new EmaModel(metricName, hostname, appId, startingWeight, startTimesSdev); + trackedEmas.put(key, emaModel); + } else { + LOG.debug("EmaModel already present for " + key); + } + + List<MetricAnomaly> anomalies = new ArrayList<>(); + + for (Long timestamp : metric.getMetricValues().keySet()) { + double metricValue = metric.getMetricValues().get(timestamp); + double anomalyScore = emaModel.testAndUpdate(metricValue); + if (anomalyScore > 0.0) { + LOG.info("Found anomaly for : " + key + ", anomalyScore = " + anomalyScore); + MetricAnomaly metricAnomaly = new MetricAnomaly(key, timestamp, metricValue, methodType, anomalyScore); + anomalies.add(metricAnomaly); + } else { + LOG.debug("Discarding non-anomaly for : " + key); + } + } + return anomalies; + } + + public boolean updateModel(TimelineMetric timelineMetric, boolean increaseSensitivity, double percent) { + String metricName = timelineMetric.getMetricName(); + String appId = timelineMetric.getAppId(); + String hostname = timelineMetric.getHostName(); + String key = metricName + "_" + appId + "_" + hostname; + + + EmaModel emaModel = trackedEmas.get(key); + + if (emaModel == null) { + LOG.warn("EMA Model for " + key + " not found"); + return false; + } + emaModel.updateModel(increaseSensitivity, percent); + + return true; + } + + @Override + public void save(SparkContext sc, String path) { + Gson gson = new Gson(); + try { + String json = gson.toJson(this); + try (Writer writer = new BufferedWriter(new OutputStreamWriter( + new FileOutputStream(path), "utf-8"))) { + writer.write(json); + } + } catch (IOException e) { + LOG.error(e); + } + } + + @Override + public String formatVersion() { + return "1.0"; + } + + public Map<String, EmaModel> getTrackedEmas() { + return trackedEmas; + } + + public double getStartingWeight() { + return startingWeight; + } + + public double getStartTimesSdev() { + return startTimesSdev; + } + +} + http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java new file mode 100644 index 0000000..04f4a73 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.methods.hsdev; + +import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import static org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.median; +import static org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.sdev; + +import java.io.Serializable; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +public class HsdevTechnique implements Serializable { + + private Map<String, Double> hsdevMap; + private String methodType = "hsdev"; + private static final Log LOG = LogFactory.getLog(HsdevTechnique.class); + + public HsdevTechnique() { + hsdevMap = new HashMap<>(); + } + + public MetricAnomaly runHsdevTest(String key, DataSeries trainData, DataSeries testData) { + int testLength = testData.values.length; + int trainLength = trainData.values.length; + + if (trainLength < testLength) { + LOG.info("Not enough train data."); + return null; + } + + if (!hsdevMap.containsKey(key)) { + hsdevMap.put(key, 3.0); + } + + double n = hsdevMap.get(key); + + double historicSd = sdev(trainData.values, false); + double historicMedian = median(trainData.values); + double currentMedian = median(testData.values); + + + if (historicSd > 0) { + double diff = Math.abs(currentMedian - historicMedian); + LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1])); + LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd); + + if (diff > n * historicSd) { + double zScore = diff / historicSd; + LOG.info("Z Score of current series : " + zScore); + return new MetricAnomaly(key, + (long) testData.ts[testLength - 1], + testData.values[testLength - 1], + methodType, + zScore); + } + } + + return null; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java new file mode 100644 index 0000000..a9360d3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.alertservice.prototype.methods.kstest; + +import org.apache.ambari.metrics.alertservice.prototype.core.RFunctionInvoker; +import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet; +import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class KSTechnique implements Serializable { + + private String methodType = "ks"; + private Map<String, Double> pValueMap; + private static final Log LOG = LogFactory.getLog(KSTechnique.class); + + public KSTechnique() { + pValueMap = new HashMap(); + } + + public MetricAnomaly runKsTest(String key, DataSeries trainData, DataSeries testData) { + + int testLength = testData.values.length; + int trainLength = trainData.values.length; + + if (trainLength < testLength) { + LOG.info("Not enough train data."); + return null; + } + + if (!pValueMap.containsKey(key)) { + pValueMap.put(key, 0.05); + } + double pValue = pValueMap.get(key); + + ResultSet result = RFunctionInvoker.ksTest(trainData, testData, Collections.singletonMap("ks.p_value", String.valueOf(pValue))); + if (result == null) { + LOG.error("Resultset is null when invoking KS R function..."); + return null; + } + + if (result.resultset.size() > 0) { + + LOG.info("Is size 1 ? result size = " + result.resultset.get(0).length); + LOG.info("p_value = " + result.resultset.get(3)[0]); + double dValue = result.resultset.get(2)[0]; + + return new MetricAnomaly(key, + (long) testData.ts[testLength - 1], + testData.values[testLength - 1], + methodType, + dValue); + } + + return null; + } + + public void updateModel(String metricKey, boolean increaseSensitivity, double percent) { + + LOG.info("Updating KS model for " + metricKey + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent); + + if (!pValueMap.containsKey(metricKey)) { + LOG.error("Unknown metric key : " + metricKey); + LOG.info("pValueMap :" + pValueMap.toString()); + return; + } + + double delta = percent / 100; + if (!increaseSensitivity) { + delta = delta * -1; + } + + double pValue = pValueMap.get(metricKey); + double newPValue = Math.min(1.0, pValue + delta * pValue); + pValueMap.put(metricKey, newPValue); + LOG.info("New pValue = " + newPValue); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java new file mode 100644 index 0000000..268cd15 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.alertservice.prototype.testing.utilities; + +import javax.xml.bind.annotation.XmlRootElement; +import java.util.List; +import java.util.Map; + +@XmlRootElement +public class MetricAnomalyDetectorTestInput { + + public MetricAnomalyDetectorTestInput() { + } + + //Train data + private String trainDataName; + private String trainDataType; + private Map<String, String> trainDataConfigs; + private int trainDataSize; + + //Test data + private String testDataName; + private String testDataType; + private Map<String, String> testDataConfigs; + private int testDataSize; + + //Algorithm data + private List<String> methods; + private Map<String, String> methodConfigs; + + public String getTrainDataName() { + return trainDataName; + } + + public void setTrainDataName(String trainDataName) { + this.trainDataName = trainDataName; + } + + public String getTrainDataType() { + return trainDataType; + } + + public void setTrainDataType(String trainDataType) { + this.trainDataType = trainDataType; + } + + public Map<String, String> getTrainDataConfigs() { + return trainDataConfigs; + } + + public void setTrainDataConfigs(Map<String, String> trainDataConfigs) { + this.trainDataConfigs = trainDataConfigs; + } + + public String getTestDataName() { + return testDataName; + } + + public void setTestDataName(String testDataName) { + this.testDataName = testDataName; + } + + public String getTestDataType() { + return testDataType; + } + + public void setTestDataType(String testDataType) { + this.testDataType = testDataType; + } + + public Map<String, String> getTestDataConfigs() { + return testDataConfigs; + } + + public void setTestDataConfigs(Map<String, String> testDataConfigs) { + this.testDataConfigs = testDataConfigs; + } + + public Map<String, String> getMethodConfigs() { + return methodConfigs; + } + + public void setMethodConfigs(Map<String, String> methodConfigs) { + this.methodConfigs = methodConfigs; + } + + public int getTrainDataSize() { + return trainDataSize; + } + + public void setTrainDataSize(int trainDataSize) { + this.trainDataSize = trainDataSize; + } + + public int getTestDataSize() { + return testDataSize; + } + + public void setTestDataSize(int testDataSize) { + this.testDataSize = testDataSize; + } + + public List<String> getMethods() { + return methods; + } + + public void setMethods(List<String> methods) { + this.methods = methods; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyTester.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyTester.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyTester.java new file mode 100644 index 0000000..6485ebb --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyTester.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.alertservice.prototype.testing.utilities; + +import org.apache.ambari.metrics.alertservice.prototype.core.MetricsCollectorInterface; +import org.apache.ambari.metrics.alertservice.prototype.core.RFunctionInvoker; +import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet; +import org.apache.ambari.metrics.alertservice.seriesgenerator.MetricSeriesGeneratorFactory; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +public class MetricAnomalyTester { + + public static String appId = MetricsCollectorInterface.serviceName; + static final Log LOG = LogFactory.getLog(MetricAnomalyTester.class); + static Map<String, TimelineMetric> timelineMetricMap = new HashMap<>(); + + public static TimelineMetrics runTestAnomalyRequest(MetricAnomalyDetectorTestInput input) throws UnknownHostException { + + long currentTime = System.currentTimeMillis(); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + String hostname = InetAddress.getLocalHost().getHostName(); + + //Train data + TimelineMetric metric1 = new TimelineMetric(); + if (StringUtils.isNotEmpty(input.getTrainDataName())) { + metric1 = timelineMetricMap.get(input.getTrainDataName()); + if (metric1 == null) { + metric1 = new TimelineMetric(); + double[] trainSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTrainDataType(), input.getTrainDataSize(), input.getTrainDataConfigs()); + metric1.setMetricName(input.getTrainDataName()); + metric1.setAppId(appId); + metric1.setHostName(hostname); + metric1.setStartTime(currentTime); + metric1.setInstanceId(null); + metric1.setMetricValues(getAsTimeSeries(currentTime, trainSeries)); + timelineMetricMap.put(input.getTrainDataName(), metric1); + } + timelineMetrics.getMetrics().add(metric1); + } else { + LOG.error("No train data name specified"); + } + + //Test data + TimelineMetric metric2 = new TimelineMetric(); + if (StringUtils.isNotEmpty(input.getTestDataName())) { + metric2 = timelineMetricMap.get(input.getTestDataName()); + if (metric2 == null) { + metric2 = new TimelineMetric(); + double[] testSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTestDataType(), input.getTestDataSize(), input.getTestDataConfigs()); + metric2.setMetricName(input.getTestDataName()); + metric2.setAppId(appId); + metric2.setHostName(hostname); + metric2.setStartTime(currentTime); + metric2.setInstanceId(null); + metric2.setMetricValues(getAsTimeSeries(currentTime, testSeries)); + timelineMetricMap.put(input.getTestDataName(), metric2); + } + timelineMetrics.getMetrics().add(metric2); + } else { + LOG.warn("No test data name specified"); + } + + //Invoke method + if (CollectionUtils.isNotEmpty(input.getMethods())) { + RFunctionInvoker.setScriptsDir("/etc/ambari-metrics-collector/conf/R-scripts"); + for (String methodType : input.getMethods()) { + ResultSet result = RFunctionInvoker.executeMethod(methodType, getAsDataSeries(metric1), getAsDataSeries(metric2), input.getMethodConfigs()); + TimelineMetric timelineMetric = getAsTimelineMetric(result, methodType, input, currentTime, hostname); + if (timelineMetric != null) { + timelineMetrics.getMetrics().add(timelineMetric); + } + } + } else { + LOG.warn("No anomaly method requested"); + } + + return timelineMetrics; + } + + + private static TimelineMetric getAsTimelineMetric(ResultSet result, String methodType, MetricAnomalyDetectorTestInput input, long currentTime, String hostname) { + + if (result == null) { + return null; + } + + TimelineMetric timelineMetric = new TimelineMetric(); + if (methodType.equals("tukeys") || methodType.equals("ema")) { + timelineMetric.setMetricName(input.getTrainDataName() + "_" + input.getTestDataName() + "_" + methodType + "_" + currentTime); + timelineMetric.setHostName(hostname); + timelineMetric.setAppId(appId); + timelineMetric.setInstanceId(null); + timelineMetric.setStartTime(currentTime); + + TreeMap<Long, Double> metricValues = new TreeMap<>(); + if (result.resultset.size() > 0) { + double[] ts = result.resultset.get(0); + double[] metrics = result.resultset.get(1); + for (int i = 0; i < ts.length; i++) { + if (i == 0) { + timelineMetric.setStartTime((long) ts[i]); + } + metricValues.put((long) ts[i], metrics[i]); + } + } + timelineMetric.setMetricValues(metricValues); + return timelineMetric; + } + return null; + } + + + private static TreeMap<Long, Double> getAsTimeSeries(long currentTime, double[] values) { + + long startTime = currentTime - (values.length - 1) * 60 * 1000; + TreeMap<Long, Double> metricValues = new TreeMap<>(); + + for (int i = 0; i < values.length; i++) { + metricValues.put(startTime, values[i]); + startTime += (60 * 1000); + } + return metricValues; + } + + private static DataSeries getAsDataSeries(TimelineMetric timelineMetric) { + + TreeMap<Long, Double> metricValues = timelineMetric.getMetricValues(); + double[] timestamps = new double[metricValues.size()]; + double[] values = new double[metricValues.size()]; + int i = 0; + + for (Long timestamp : metricValues.keySet()) { + timestamps[i] = timestamp; + values[i++] = metricValues.get(timestamp); + } + return new DataSeries(timelineMetric.getMetricName() + "_" + timelineMetric.getAppId() + "_" + timelineMetric.getHostName(), timestamps, values); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestMetricSeriesGenerator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestMetricSeriesGenerator.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestMetricSeriesGenerator.java new file mode 100644 index 0000000..b817f3e --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestMetricSeriesGenerator.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.testing.utilities; + +/** + * Class which was originally used to send test series from AMS to Spark through Kafka. + */ + +public class TestMetricSeriesGenerator { + //implements Runnable { + +// private Map<TestSeriesInputRequest, AbstractMetricSeries> configuredSeries = new HashMap<>(); +// private static final Log LOG = LogFactory.getLog(TestMetricSeriesGenerator.class); +// private TimelineMetricStore metricStore; +// private String hostname; +// +// public TestMetricSeriesGenerator(TimelineMetricStore metricStore) { +// this.metricStore = metricStore; +// try { +// this.hostname = InetAddress.getLocalHost().getHostName(); +// } catch (UnknownHostException e) { +// e.printStackTrace(); +// } +// } +// +// public void addSeries(TestSeriesInputRequest inputRequest) { +// if (!configuredSeries.containsKey(inputRequest)) { +// AbstractMetricSeries metricSeries = MetricSeriesGeneratorFactory.generateSeries(inputRequest.getSeriesType(), inputRequest.getConfigs()); +// configuredSeries.put(inputRequest, metricSeries); +// LOG.info("Added series " + inputRequest.getSeriesName()); +// } +// } +// +// public void removeSeries(String seriesName) { +// boolean isPresent = false; +// TestSeriesInputRequest tbd = null; +// for (TestSeriesInputRequest inputRequest : configuredSeries.keySet()) { +// if (inputRequest.getSeriesName().equals(seriesName)) { +// isPresent = true; +// tbd = inputRequest; +// } +// } +// if (isPresent) { +// LOG.info("Removing series " + seriesName); +// configuredSeries.remove(tbd); +// } else { +// LOG.info("Series not found : " + seriesName); +// } +// } +// +// @Override +// public void run() { +// long currentTime = System.currentTimeMillis(); +// TimelineMetrics timelineMetrics = new TimelineMetrics(); +// +// for (TestSeriesInputRequest input : configuredSeries.keySet()) { +// AbstractMetricSeries metricSeries = configuredSeries.get(input); +// TimelineMetric timelineMetric = new TimelineMetric(); +// timelineMetric.setMetricName(input.getSeriesName()); +// timelineMetric.setAppId("anomaly-engine-test-metric"); +// timelineMetric.setInstanceId(null); +// timelineMetric.setStartTime(currentTime); +// timelineMetric.setHostName(hostname); +// TreeMap<Long, Double> metricValues = new TreeMap(); +// metricValues.put(currentTime, metricSeries.nextValue()); +// timelineMetric.setMetricValues(metricValues); +// timelineMetrics.addOrMergeTimelineMetric(timelineMetric); +// LOG.info("Emitting metric with appId = " + timelineMetric.getAppId()); +// } +// try { +// LOG.info("Publishing test metrics for " + timelineMetrics.getMetrics().size() + " series."); +// metricStore.putMetrics(timelineMetrics); +// } catch (Exception e) { +// LOG.error(e); +// } +// } +}
