http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java new file mode 100644 index 0000000..7735d6c --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; +import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; +import scala.Tuple2; + +import java.util.*; + +public class MetricSparkConsumer { + + private static final Log LOG = LogFactory.getLog(MetricSparkConsumer.class); + private static String groupId = "ambari-metrics-group"; + private static String topicName = "ambari-metrics-topic"; + private static int numThreads = 1; + private static long pitStartTime = System.currentTimeMillis(); + private static long ksStartTime = pitStartTime; + private static long hdevStartTime = ksStartTime; + + public MetricSparkConsumer() { + } + + public static void main(String[] args) throws InterruptedException { + + if (args.length < 5) { + System.err.println("Usage: MetricSparkConsumer <appid1,appid2> <collector_host> <port> <protocol> <zkQuorum>"); + System.exit(1); + } + + List<String> appIds = Arrays.asList(args[0].split(",")); + String collectorHost = args[1]; + String collectorPort = args[2]; + String collectorProtocol = args[3]; + String zkQuorum = args[4]; + + double emaW = StringUtils.isNotEmpty(args[5]) ? Double.parseDouble(args[5]) : 0.5; + double emaN = StringUtils.isNotEmpty(args[8]) ? Double.parseDouble(args[6]) : 3; + double tukeysN = StringUtils.isNotEmpty(args[7]) ? Double.parseDouble(args[7]) : 3; + + long pitTestInterval = StringUtils.isNotEmpty(args[8]) ? Long.parseLong(args[8]) : 5 * 60 * 1000; + long pitTrainInterval = StringUtils.isNotEmpty(args[9]) ? Long.parseLong(args[9]) : 15 * 60 * 1000; + + String fileName = args[10]; + long ksTestInterval = StringUtils.isNotEmpty(args[11]) ? Long.parseLong(args[11]) : 10 * 60 * 1000; + long ksTrainInterval = StringUtils.isNotEmpty(args[12]) ? Long.parseLong(args[12]) : 10 * 60 * 1000; + int hsdevNhp = StringUtils.isNotEmpty(args[13]) ? Integer.parseInt(args[13]) : 3; + long hsdevInterval = StringUtils.isNotEmpty(args[14]) ? Long.parseLong(args[14]) : 30 * 60 * 1000; + + String ambariServerHost = args[15]; + String clusterName = args[16]; + + MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort); + + SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector"); + + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); + + EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN); + PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(metricsCollectorInterface, + tukeysN, + pitTestInterval, + pitTrainInterval, + ambariServerHost, + clusterName); + + TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface, + ksTestInterval, + ksTrainInterval, + hsdevNhp, + fileName); + + Broadcast<EmaTechnique> emaTechniqueBroadcast = jssc.sparkContext().broadcast(emaTechnique); + Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = jssc.sparkContext().broadcast(pointInTimeADSystem); + Broadcast<TrendADSystem> trendADSystemBroadcast = jssc.sparkContext().broadcast(trendADSystem); + Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = jssc.sparkContext().broadcast(metricsCollectorInterface); + + JavaPairReceiverInputDStream<String, String> messages = + KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads)); + + //Convert JSON string to TimelineMetrics. + JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() { + @Override + public TimelineMetrics call(Tuple2<String, String> message) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class); + return metrics; + } + }); + + timelineMetricsStream.print(); + + //Group TimelineMetric by AppId. + JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair( + timelineMetrics -> timelineMetrics.getMetrics().isEmpty() ? new Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), timelineMetrics) + ); + + appMetricStream.print(); + + //Filter AppIds that are not needed. + JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() { + @Override + public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception { + return appIds.contains(appMetricTuple._1); + } + }); + + filteredAppMetricStream.print(); + + filteredAppMetricStream.foreachRDD(rdd -> { + rdd.foreach( + tuple2 -> { + long currentTime = System.currentTimeMillis(); + EmaTechnique ema = emaTechniqueBroadcast.getValue(); + if (currentTime > pitStartTime + pitTestInterval) { + LOG.info("Running Tukeys...."); + pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, currentTime); + pitStartTime = pitStartTime + pitTestInterval; + } + + if (currentTime > ksStartTime + ksTestInterval) { + LOG.info("Running KS Test...."); + trendADSystemBroadcast.getValue().runKSTest(currentTime); + ksStartTime = ksStartTime + ksTestInterval; + } + + if (currentTime > hdevStartTime + hsdevInterval) { + LOG.info("Running HSdev Test...."); + trendADSystemBroadcast.getValue().runHsdevMethod(); + hdevStartTime = hdevStartTime + hsdevInterval; + } + + TimelineMetrics metrics = tuple2._2(); + for (TimelineMetric timelineMetric : metrics.getMetrics()) { + List<MetricAnomaly> anomalies = ema.test(timelineMetric); + metricsCollectorInterfaceBroadcast.getValue().publish(anomalies); + } + }); + }); + + jssc.start(); + jssc.awaitTermination(); + } +} + + + +
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java new file mode 100644 index 0000000..7b3f63d --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype; + +import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.codehaus.jackson.map.AnnotationIntrospector; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectReader; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.TreeMap; + +public class MetricsCollectorInterface implements Serializable { + + private static String hostName = null; + private String instanceId = null; + public final static String serviceName = "anomaly-engine"; + private String collectorHost; + private String protocol; + private String port; + private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics"; + private static final Log LOG = LogFactory.getLog(MetricsCollectorInterface.class); + private static ObjectMapper mapper; + private final static ObjectReader timelineObjectReader; + + static { + mapper = new ObjectMapper(); + AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); + mapper.setAnnotationIntrospector(introspector); + mapper.getSerializationConfig() + .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); + timelineObjectReader = mapper.reader(TimelineMetrics.class); + } + + public MetricsCollectorInterface(String collectorHost, String protocol, String port) { + this.collectorHost = collectorHost; + this.protocol = protocol; + this.port = port; + this.hostName = getDefaultLocalHostName(); + } + + public static String getDefaultLocalHostName() { + + if (hostName != null) { + return hostName; + } + + try { + return InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + LOG.info("Error getting host address"); + } + return null; + } + + public void publish(List<MetricAnomaly> metricAnomalies) { + if (CollectionUtils.isNotEmpty(metricAnomalies)) { + LOG.info("Sending metric anomalies of size : " + metricAnomalies.size()); + List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies); + if (!metricList.isEmpty()) { + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.setMetrics(metricList); + emitMetrics(timelineMetrics); + } + } else { + LOG.info("No anomalies to send."); + } + } + + private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) { + List<TimelineMetric> metrics = new ArrayList<>(); + + if (metricAnomalies.isEmpty()) { + return metrics; + } + + for (MetricAnomaly anomaly : metricAnomalies) { + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(anomaly.getMetricKey()); + timelineMetric.setAppId(serviceName + "-" + anomaly.getMethodType()); + timelineMetric.setInstanceId(null); + timelineMetric.setHostName(getDefaultLocalHostName()); + timelineMetric.setStartTime(anomaly.getTimestamp()); + HashMap<String, String> metadata = new HashMap<>(); + metadata.put("method", anomaly.getMethodType()); + metadata.put("anomaly-score", String.valueOf(anomaly.getAnomalyScore())); + timelineMetric.setMetadata(metadata); + TreeMap<Long,Double> metricValues = new TreeMap<>(); + metricValues.put(anomaly.getTimestamp(), anomaly.getMetricValue()); + timelineMetric.setMetricValues(metricValues); + + metrics.add(timelineMetric); + } + return metrics; + } + + public boolean emitMetrics(TimelineMetrics metrics) { + String connectUrl = constructTimelineMetricUri(); + String jsonData = null; + LOG.info("EmitMetrics connectUrl = " + connectUrl); + try { + jsonData = mapper.writeValueAsString(metrics); + LOG.info(jsonData); + } catch (IOException e) { + LOG.error("Unable to parse metrics", e); + } + if (jsonData != null) { + return emitMetricsJson(connectUrl, jsonData); + } + return false; + } + + private HttpURLConnection getConnection(String spec) throws IOException { + return (HttpURLConnection) new URL(spec).openConnection(); + } + + private boolean emitMetricsJson(String connectUrl, String jsonData) { + int timeout = 10000; + HttpURLConnection connection = null; + try { + if (connectUrl == null) { + throw new IOException("Unknown URL. Unable to connect to metrics collector."); + } + connection = getConnection(connectUrl); + + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setRequestProperty("Connection", "Keep-Alive"); + connection.setConnectTimeout(timeout); + connection.setReadTimeout(timeout); + connection.setDoOutput(true); + + if (jsonData != null) { + try (OutputStream os = connection.getOutputStream()) { + os.write(jsonData.getBytes("UTF-8")); + } + } + + int statusCode = connection.getResponseCode(); + + if (statusCode != 200) { + LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " + + "statusCode = " + statusCode); + } else { + LOG.info("Metrics posted to Collector " + connectUrl); + } + return true; + } catch (IOException ioe) { + LOG.error(ioe.getMessage()); + } + return false; + } + + private String constructTimelineMetricUri() { + StringBuilder sb = new StringBuilder(protocol); + sb.append("://"); + sb.append(collectorHost); + sb.append(":"); + sb.append(port); + sb.append(WS_V1_TIMELINE_METRICS); + return sb.toString(); + } + + public TimelineMetrics fetchMetrics(String metricName, + String appId, + String hostname, + long startime, + long endtime) { + + String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId + + "&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime; + LOG.info("Fetch metrics URL : " + url); + + URL obj = null; + BufferedReader in = null; + TimelineMetrics timelineMetrics = new TimelineMetrics(); + + try { + obj = new URL(url); + HttpURLConnection con = (HttpURLConnection) obj.openConnection(); + con.setRequestMethod("GET"); + int responseCode = con.getResponseCode(); + LOG.info("Sending 'GET' request to URL : " + url); + LOG.info("Response Code : " + responseCode); + + in = new BufferedReader( + new InputStreamReader(con.getInputStream())); + timelineMetrics = timelineObjectReader.readValue(in); + } catch (Exception e) { + LOG.error(e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + LOG.warn(e); + } + } + } + + LOG.info("Fetched " + timelineMetrics.getMetrics().size() + " metrics."); + return timelineMetrics; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java new file mode 100644 index 0000000..b4a8593 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype; + +import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet; +import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaModel; +import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +public class PointInTimeADSystem implements Serializable { + + //private EmaTechnique emaTechnique; + private MetricsCollectorInterface metricsCollectorInterface; + private Map<String, Double> tukeysNMap; + private double defaultTukeysN = 3; + + private long testIntervalMillis = 5*60*1000; //10mins + private long trainIntervalMillis = 15*60*1000; //1hour + + private static final Log LOG = LogFactory.getLog(PointInTimeADSystem.class); + + private AmbariServerInterface ambariServerInterface; + private int sensitivity = 50; + private int minSensitivity = 0; + private int maxSensitivity = 10; + + public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN, + long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) { + this.metricsCollectorInterface = metricsCollectorInterface; + this.defaultTukeysN = defaultTukeysN; + this.tukeysNMap = new HashMap<>(); + this.testIntervalMillis = testIntervalMillis; + this.trainIntervalMillis = trainIntervalMillis; + this.ambariServerInterface = new AmbariServerInterface(ambariServerHost, clusterName); + LOG.info("Starting PointInTimeADSystem..."); + } + + public void runTukeysAndRefineEma(EmaTechnique emaTechnique, long startTime) { + LOG.info("Running Tukeys for test data interval [" + new Date(startTime - testIntervalMillis) + " : " + new Date(startTime) + "], with train data period [" + new Date(startTime - testIntervalMillis - trainIntervalMillis) + " : " + new Date(startTime - testIntervalMillis) + "]"); + + int requiredSensivity = ambariServerInterface.getPointInTimeSensitivity(); + if (requiredSensivity == -1 || requiredSensivity == sensitivity) { + LOG.info("No change in sensitivity needed."); + } else { + LOG.info("Current tukey's N value = " + defaultTukeysN); + if (requiredSensivity > sensitivity) { + int targetSensitivity = Math.min(maxSensitivity, requiredSensivity); + while (sensitivity < targetSensitivity) { + defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.1; + sensitivity++; + } + } else { + int targetSensitivity = Math.max(minSensitivity, requiredSensivity); + while (sensitivity > targetSensitivity) { + defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.1; + sensitivity--; + } + } + LOG.info("New tukey's N value = " + defaultTukeysN); + } + + TimelineMetrics timelineMetrics = new TimelineMetrics(); + for (String metricKey : emaTechnique.getTrackedEmas().keySet()) { + LOG.info("EMA key = " + metricKey); + EmaModel emaModel = emaTechnique.getTrackedEmas().get(metricKey); + String metricName = emaModel.getMetricName(); + String appId = emaModel.getAppId(); + String hostname = emaModel.getHostname(); + + TimelineMetrics tukeysData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, startTime - (testIntervalMillis + trainIntervalMillis), + startTime); + + if (tukeysData.getMetrics().isEmpty()) { + LOG.info("No metrics fetched for Tukeys, metricKey = " + metricKey); + continue; + } + + List<Double> trainTsList = new ArrayList<>(); + List<Double> trainDataList = new ArrayList<>(); + List<Double> testTsList = new ArrayList<>(); + List<Double> testDataList = new ArrayList<>(); + + for (TimelineMetric metric : tukeysData.getMetrics()) { + for (Long timestamp : metric.getMetricValues().keySet()) { + if (timestamp <= (startTime - testIntervalMillis)) { + trainDataList.add(metric.getMetricValues().get(timestamp)); + trainTsList.add((double)timestamp); + } else { + testDataList.add(metric.getMetricValues().get(timestamp)); + testTsList.add((double)timestamp); + } + } + } + + if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { + LOG.info("Not enough train/test data to perform analysis."); + continue; + } + + String tukeysTrainSeries = "tukeysTrainSeries"; + double[] trainTs = new double[trainTsList.size()]; + double[] trainData = new double[trainTsList.size()]; + for (int i = 0; i < trainTs.length; i++) { + trainTs[i] = trainTsList.get(i); + trainData[i] = trainDataList.get(i); + } + + String tukeysTestSeries = "tukeysTestSeries"; + double[] testTs = new double[testTsList.size()]; + double[] testData = new double[testTsList.size()]; + for (int i = 0; i < testTs.length; i++) { + testTs[i] = testTsList.get(i); + testData[i] = testDataList.get(i); + } + + LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length); + + DataSeries tukeysTrainData = new DataSeries(tukeysTrainSeries, trainTs, trainData); + DataSeries tukeysTestData = new DataSeries(tukeysTestSeries, testTs, testData); + + if (!tukeysNMap.containsKey(metricKey)) { + tukeysNMap.put(metricKey, defaultTukeysN); + } + + Map<String, String> configs = new HashMap<>(); + configs.put("tukeys.n", String.valueOf(tukeysNMap.get(metricKey))); + + ResultSet rs = RFunctionInvoker.tukeys(tukeysTrainData, tukeysTestData, configs); + + List<TimelineMetric> tukeysMetrics = getAsTimelineMetric(rs, metricName, appId, hostname); + LOG.info("Tukeys anomalies size : " + tukeysMetrics.size()); + TreeMap<Long, Double> tukeysMetricValues = new TreeMap<>(); + + for (TimelineMetric tukeysMetric : tukeysMetrics) { + tukeysMetricValues.putAll(tukeysMetric.getMetricValues()); + timelineMetrics.addOrMergeTimelineMetric(tukeysMetric); + } + + TimelineMetrics emaData = metricsCollectorInterface.fetchMetrics(metricKey, MetricsCollectorInterface.serviceName+"-ema", MetricsCollectorInterface.getDefaultLocalHostName(), startTime - testIntervalMillis, startTime); + TreeMap<Long, Double> emaMetricValues = new TreeMap(); + if (!emaData.getMetrics().isEmpty()) { + emaMetricValues = emaData.getMetrics().get(0).getMetricValues(); + } + + LOG.info("Ema anomalies size : " + emaMetricValues.size()); + int tp = 0; + int tn = 0; + int fp = 0; + int fn = 0; + + for (double ts : testTs) { + long timestamp = (long) ts; + if (tukeysMetricValues.containsKey(timestamp)) { + if (emaMetricValues.containsKey(timestamp)) { + tp++; + } else { + fn++; + } + } else { + if (emaMetricValues.containsKey(timestamp)) { + fp++; + } else { + tn++; + } + } + } + + double recall = (double) tp / (double) (tp + fn); + double precision = (double) tp / (double) (tp + fp); + LOG.info("----------------------------"); + LOG.info("Precision Recall values for " + metricKey); + LOG.info("tp=" + tp + ", fp=" + fp + ", tn=" + tn + ", fn=" + fn); + LOG.info("----------------------------"); + + if (recall < 0.5) { + LOG.info("Increasing EMA sensitivity by 10%"); + emaModel.updateModel(true, 10); + } else if (precision < 0.5) { + LOG.info("Decreasing EMA sensitivity by 10%"); + emaModel.updateModel(false, 10); + } + + } + + if (emaTechnique.getTrackedEmas().isEmpty()){ + LOG.info("No EMA Technique keys tracked!!!!"); + } + + if (!timelineMetrics.getMetrics().isEmpty()) { + metricsCollectorInterface.emitMetrics(timelineMetrics); + } + } + + private static List<TimelineMetric> getAsTimelineMetric(ResultSet result, String metricName, String appId, String hostname) { + + List<TimelineMetric> timelineMetrics = new ArrayList<>(); + + if (result == null) { + LOG.info("ResultSet from R call is null!!"); + return null; + } + + if (result.resultset.size() > 0) { + double[] ts = result.resultset.get(0); + double[] metrics = result.resultset.get(1); + double[] anomalyScore = result.resultset.get(2); + for (int i = 0; i < ts.length; i++) { + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(metricName + "_" + appId + "_" + hostname); + timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName()); + timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys"); + timelineMetric.setInstanceId(null); + timelineMetric.setStartTime((long) ts[i]); + TreeMap<Long, Double> metricValues = new TreeMap<>(); + metricValues.put((long) ts[i], metrics[i]); + + HashMap<String, String> metadata = new HashMap<>(); + metadata.put("method", "tukeys"); + metadata.put("anomaly-score", String.valueOf(anomalyScore[i])); + timelineMetric.setMetadata(metadata); + + timelineMetric.setMetricValues(metricValues); + timelineMetrics.add(timelineMetric); + } + } + + return timelineMetrics; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java new file mode 100644 index 0000000..4fdf27d --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype; + + +import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet; +import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.rosuda.JRI.REXP; +import org.rosuda.JRI.RVector; +import org.rosuda.JRI.Rengine; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class RFunctionInvoker { + + static final Log LOG = LogFactory.getLog(RFunctionInvoker.class); + public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null); + private static String rScriptDir = "/usr/lib/ambari-metrics-collector/R-scripts"; + + private static void loadDataSets(Rengine r, DataSeries trainData, DataSeries testData) { + r.assign("train_ts", trainData.ts); + r.assign("train_x", trainData.values); + r.eval("train_data <- data.frame(train_ts,train_x)"); + r.eval("names(train_data) <- c(\"TS\", " + trainData.seriesName + ")"); + + r.assign("test_ts", testData.ts); + r.assign("test_x", testData.values); + r.eval("test_data <- data.frame(test_ts,test_x)"); + r.eval("names(test_data) <- c(\"TS\", " + testData.seriesName + ")"); + } + + public static void setScriptsDir(String dir) { + rScriptDir = dir; + } + + public static ResultSet executeMethod(String methodType, DataSeries trainData, DataSeries testData, Map<String, String> configs) { + + ResultSet result; + switch (methodType) { + case "tukeys": + result = tukeys(trainData, testData, configs); + break; + case "ema": + result = ema_global(trainData, testData, configs); + break; + case "ks": + result = ksTest(trainData, testData, configs); + break; + case "hsdev": + result = hsdev(trainData, testData, configs); + break; + default: + result = tukeys(trainData, testData, configs); + break; + } + return result; + } + + public static ResultSet tukeys(DataSeries trainData, DataSeries testData, Map<String, String> configs) { + try { + + REXP exp1 = r.eval("source('" + rScriptDir + "/tukeys.r" + "')"); + + double n = Double.parseDouble(configs.get("tukeys.n")); + r.eval("n <- " + n); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ams_tukeys(train_data, test_data, n)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i < cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + } catch (Exception e) { + LOG.error(e); + } finally { + r.end(); + } + return null; + } + + public static ResultSet ema_global(DataSeries trainData, DataSeries testData, Map<String, String> configs) { + try { + r.eval("source('" + rScriptDir + "/ema.r" + "')"); + + int n = Integer.parseInt(configs.get("ema.n")); + r.eval("n <- " + n); + + double w = Double.parseDouble(configs.get("ema.w")); + r.eval("w <- " + w); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ema_global(train_data, test_data, w, n)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i < cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + + } catch (Exception e) { + LOG.error(e); + } finally { + r.end(); + } + return null; + } + + public static ResultSet ema_daily(DataSeries trainData, DataSeries testData, Map<String, String> configs) { + try { + r.eval("source('" + rScriptDir + "/ema.r" + "')"); + + int n = Integer.parseInt(configs.get("ema.n")); + r.eval("n <- " + n); + + double w = Double.parseDouble(configs.get("ema.w")); + r.eval("w <- " + w); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ema_daily(train_data, test_data, w, n)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i < cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + + } catch (Exception e) { + LOG.error(e); + } finally { + r.end(); + } + return null; + } + + public static ResultSet ksTest(DataSeries trainData, DataSeries testData, Map<String, String> configs) { + try { + r.eval("source('" + rScriptDir + "/kstest.r" + "')"); + + double p_value = Double.parseDouble(configs.get("ks.p_value")); + r.eval("p_value <- " + p_value); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ams_ks(train_data, test_data, p_value)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i < cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + + } catch (Exception e) { + LOG.error(e); + } finally { + r.end(); + } + return null; + } + + public static ResultSet hsdev(DataSeries trainData, DataSeries testData, Map<String, String> configs) { + try { + r.eval("source('" + rScriptDir + "/hsdev.r" + "')"); + + int n = Integer.parseInt(configs.get("hsdev.n")); + r.eval("n <- " + n); + + int nhp = Integer.parseInt(configs.get("hsdev.nhp")); + r.eval("nhp <- " + nhp); + + long interval = Long.parseLong(configs.get("hsdev.interval")); + r.eval("interval <- " + interval); + + long period = Long.parseLong(configs.get("hsdev.period")); + r.eval("period <- " + period); + + loadDataSets(r, trainData, testData); + + r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)"); + REXP exp = r.eval("an2"); + RVector cont = (RVector) exp.getContent(); + + List<double[]> result = new ArrayList(); + for (int i = 0; i < cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + } catch (Exception e) { + LOG.error(e); + } finally { + r.end(); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java new file mode 100644 index 0000000..7485f01 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype; + +import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException; +import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; + +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Collections; +import java.util.Map; + +@XmlRootElement +public class TestSeriesInputRequest { + + private String seriesName; + private String seriesType; + private Map<String, String> configs; + + public TestSeriesInputRequest() { + } + + public TestSeriesInputRequest(String seriesName, String seriesType, Map<String, String> configs) { + this.seriesName = seriesName; + this.seriesType = seriesType; + this.configs = configs; + } + + public String getSeriesName() { + return seriesName; + } + + public void setSeriesName(String seriesName) { + this.seriesName = seriesName; + } + + public String getSeriesType() { + return seriesType; + } + + public void setSeriesType(String seriesType) { + this.seriesType = seriesType; + } + + public Map<String, String> getConfigs() { + return configs; + } + + public void setConfigs(Map<String, String> configs) { + this.configs = configs; + } + + @Override + public boolean equals(Object o) { + TestSeriesInputRequest anotherInput = (TestSeriesInputRequest)o; + return anotherInput.getSeriesName().equals(this.getSeriesName()); + } + + @Override + public int hashCode() { + return seriesName.hashCode(); + } + + public static void main(String[] args) { + + ObjectMapper objectMapper = new ObjectMapper(); + TestSeriesInputRequest testSeriesInputRequest = new TestSeriesInputRequest("test", "ema", Collections.singletonMap("key","value")); + try { + System.out.print(objectMapper.writeValueAsString(testSeriesInputRequest)); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java new file mode 100644 index 0000000..1534b55 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype; + +import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; +import org.apache.ambari.metrics.alertservice.prototype.methods.hsdev.HsdevTechnique; +import org.apache.ambari.metrics.alertservice.prototype.methods.kstest.KSTechnique; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +public class TrendADSystem implements Serializable { + + private MetricsCollectorInterface metricsCollectorInterface; + private List<TrendMetric> trendMetrics; + + private long ksTestIntervalMillis = 10 * 60 * 1000; + private long ksTrainIntervalMillis = 10 * 60 * 1000; + private KSTechnique ksTechnique; + + private HsdevTechnique hsdevTechnique; + private int hsdevNumHistoricalPeriods = 3; + + private Map<KsSingleRunKey, MetricAnomaly> trackedKsAnomalies = new HashMap<>(); + private static final Log LOG = LogFactory.getLog(TrendADSystem.class); + private String inputFile = ""; + + public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface, + long ksTestIntervalMillis, + long ksTrainIntervalMillis, + int hsdevNumHistoricalPeriods, + String inputFileName) { + + this.metricsCollectorInterface = metricsCollectorInterface; + this.ksTestIntervalMillis = ksTestIntervalMillis; + this.ksTrainIntervalMillis = ksTrainIntervalMillis; + this.hsdevNumHistoricalPeriods = hsdevNumHistoricalPeriods; + + this.ksTechnique = new KSTechnique(); + this.hsdevTechnique = new HsdevTechnique(); + + trendMetrics = new ArrayList<>(); + this.inputFile = inputFileName; + readInputFile(inputFileName); + } + + public void runKSTest(long currentEndTime) { + readInputFile(inputFile); + + long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis; + LOG.info("Running KS Test for test data interval [" + new Date(ksTestIntervalStartTime) + " : " + + new Date(currentEndTime) + "], with train data period [" + new Date(ksTestIntervalStartTime - ksTrainIntervalMillis) + + " : " + new Date(ksTestIntervalStartTime) + "]"); + + for (TrendMetric metric : trendMetrics) { + String metricName = metric.metricName; + String appId = metric.appId; + String hostname = metric.hostname; + String key = metricName + "_" + appId + "_" + hostname; + + TimelineMetrics ksData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, ksTestIntervalStartTime - ksTrainIntervalMillis, + currentEndTime); + + if (ksData.getMetrics().isEmpty()) { + LOG.info("No metrics fetched for KS, metricKey = " + key); + continue; + } + + List<Double> trainTsList = new ArrayList<>(); + List<Double> trainDataList = new ArrayList<>(); + List<Double> testTsList = new ArrayList<>(); + List<Double> testDataList = new ArrayList<>(); + + for (TimelineMetric timelineMetric : ksData.getMetrics()) { + for (Long timestamp : timelineMetric.getMetricValues().keySet()) { + if (timestamp <= ksTestIntervalStartTime) { + trainDataList.add(timelineMetric.getMetricValues().get(timestamp)); + trainTsList.add((double) timestamp); + } else { + testDataList.add(timelineMetric.getMetricValues().get(timestamp)); + testTsList.add((double) timestamp); + } + } + } + + if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { + LOG.info("Not enough train/test data to perform KS analysis."); + continue; + } + + String ksTrainSeries = "KSTrainSeries"; + double[] trainTs = new double[trainTsList.size()]; + double[] trainData = new double[trainTsList.size()]; + for (int i = 0; i < trainTs.length; i++) { + trainTs[i] = trainTsList.get(i); + trainData[i] = trainDataList.get(i); + } + + String ksTestSeries = "KSTestSeries"; + double[] testTs = new double[testTsList.size()]; + double[] testData = new double[testTsList.size()]; + for (int i = 0; i < testTs.length; i++) { + testTs[i] = testTsList.get(i); + testData[i] = testDataList.get(i); + } + + LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length); + + DataSeries ksTrainData = new DataSeries(ksTrainSeries, trainTs, trainData); + DataSeries ksTestData = new DataSeries(ksTestSeries, testTs, testData); + + MetricAnomaly metricAnomaly = ksTechnique.runKsTest(key, ksTrainData, ksTestData); + if (metricAnomaly == null) { + LOG.info("No anomaly from KS test."); + } else { + LOG.info("Found Anomaly in KS Test. Publishing KS Anomaly metric...."); + TimelineMetric timelineMetric = getAsTimelineMetric(metricAnomaly, + ksTestIntervalStartTime, currentEndTime, ksTestIntervalStartTime - ksTrainIntervalMillis, ksTestIntervalStartTime); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.addOrMergeTimelineMetric(timelineMetric); + metricsCollectorInterface.emitMetrics(timelineMetrics); + + trackedKsAnomalies.put(new KsSingleRunKey(ksTestIntervalStartTime, currentEndTime, metricName, appId, hostname), metricAnomaly); + } + } + + if (trendMetrics.isEmpty()) { + LOG.info("No Trend metrics tracked!!!!"); + } + + } + + private TimelineMetric getAsTimelineMetric(MetricAnomaly metricAnomaly, + long testStart, + long testEnd, + long trainStart, + long trainEnd) { + + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(metricAnomaly.getMetricKey()); + timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-" + metricAnomaly.getMethodType()); + timelineMetric.setInstanceId(null); + timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName()); + timelineMetric.setStartTime(testEnd); + HashMap<String, String> metadata = new HashMap<>(); + metadata.put("method", metricAnomaly.getMethodType()); + metadata.put("anomaly-score", String.valueOf(metricAnomaly.getAnomalyScore())); + metadata.put("test-start-time", String.valueOf(testStart)); + metadata.put("train-start-time", String.valueOf(trainStart)); + metadata.put("train-end-time", String.valueOf(trainEnd)); + timelineMetric.setMetadata(metadata); + TreeMap<Long,Double> metricValues = new TreeMap<>(); + metricValues.put(testEnd, metricAnomaly.getMetricValue()); + timelineMetric.setMetricValues(metricValues); + return timelineMetric; + + } + public void runHsdevMethod() { + + List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>(); + + for (KsSingleRunKey ksSingleRunKey : trackedKsAnomalies.keySet()) { + + long hsdevTestEnd = ksSingleRunKey.endTime; + long hsdevTestStart = ksSingleRunKey.startTime; + + long period = hsdevTestEnd - hsdevTestStart; + + long hsdevTrainStart = hsdevTestStart - (hsdevNumHistoricalPeriods) * period; + long hsdevTrainEnd = hsdevTestStart; + + LOG.info("Running HSdev Test for test data interval [" + new Date(hsdevTestStart) + " : " + + new Date(hsdevTestEnd) + "], with train data period [" + new Date(hsdevTrainStart) + + " : " + new Date(hsdevTrainEnd) + "]"); + + String metricName = ksSingleRunKey.metricName; + String appId = ksSingleRunKey.appId; + String hostname = ksSingleRunKey.hostname; + String key = metricName + "_" + appId + "_" + hostname; + + TimelineMetrics hsdevData = metricsCollectorInterface.fetchMetrics( + metricName, + appId, + hostname, + hsdevTrainStart, + hsdevTestEnd); + + if (hsdevData.getMetrics().isEmpty()) { + LOG.info("No metrics fetched for HSDev, metricKey = " + key); + continue; + } + + List<Double> trainTsList = new ArrayList<>(); + List<Double> trainDataList = new ArrayList<>(); + List<Double> testTsList = new ArrayList<>(); + List<Double> testDataList = new ArrayList<>(); + + for (TimelineMetric timelineMetric : hsdevData.getMetrics()) { + for (Long timestamp : timelineMetric.getMetricValues().keySet()) { + if (timestamp <= hsdevTestStart) { + trainDataList.add(timelineMetric.getMetricValues().get(timestamp)); + trainTsList.add((double) timestamp); + } else { + testDataList.add(timelineMetric.getMetricValues().get(timestamp)); + testTsList.add((double) timestamp); + } + } + } + + if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { + LOG.info("Not enough train/test data to perform Hsdev analysis."); + continue; + } + + String hsdevTrainSeries = "HsdevTrainSeries"; + double[] trainTs = new double[trainTsList.size()]; + double[] trainData = new double[trainTsList.size()]; + for (int i = 0; i < trainTs.length; i++) { + trainTs[i] = trainTsList.get(i); + trainData[i] = trainDataList.get(i); + } + + String hsdevTestSeries = "HsdevTestSeries"; + double[] testTs = new double[testTsList.size()]; + double[] testData = new double[testTsList.size()]; + for (int i = 0; i < testTs.length; i++) { + testTs[i] = testTsList.get(i); + testData[i] = testDataList.get(i); + } + + LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length); + + DataSeries hsdevTrainData = new DataSeries(hsdevTrainSeries, trainTs, trainData); + DataSeries hsdevTestData = new DataSeries(hsdevTestSeries, testTs, testData); + + MetricAnomaly metricAnomaly = hsdevTechnique.runHsdevTest(key, hsdevTrainData, hsdevTestData); + if (metricAnomaly == null) { + LOG.info("No anomaly from Hsdev test. Mismatch between KS and HSDev. "); + ksTechnique.updateModel(key, false, 10); + } else { + LOG.info("Found Anomaly in Hsdev Test. This confirms KS anomaly."); + hsdevMetricAnomalies.add(getAsTimelineMetric(metricAnomaly, + hsdevTestStart, hsdevTestEnd, hsdevTrainStart, hsdevTrainEnd)); + } + } + clearTrackedKsRunKeys(); + + if (!hsdevMetricAnomalies.isEmpty()) { + LOG.info("Publishing Hsdev Anomalies...."); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.setMetrics(hsdevMetricAnomalies); + metricsCollectorInterface.emitMetrics(timelineMetrics); + } + } + + private void clearTrackedKsRunKeys() { + trackedKsAnomalies.clear(); + } + + private void readInputFile(String fileName) { + trendMetrics.clear(); + try (BufferedReader br = new BufferedReader(new FileReader(fileName))) { + for (String line; (line = br.readLine()) != null; ) { + String[] splits = line.split(","); + LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]); + trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2])); + } + } catch (IOException e) { + LOG.error("Error reading input file : " + e); + } + } + + class KsSingleRunKey implements Serializable{ + + long startTime; + long endTime; + String metricName; + String appId; + String hostname; + + public KsSingleRunKey(long startTime, long endTime, String metricName, String appId, String hostname) { + this.startTime = startTime; + this.endTime = endTime; + this.metricName = metricName; + this.appId = appId; + this.hostname = hostname; + } + } + + /* + boolean isPresent = false; + for (TrendMetric trendMetric : trendMetrics) { + if (trendMetric.metricName.equalsIgnoreCase(splits[0])) { + isPresent = true; + } + } + if (!isPresent) { + LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]); + trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2])); + } + */ +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendMetric.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendMetric.java new file mode 100644 index 0000000..3bead8b --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendMetric.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype; + +import java.io.Serializable; + +public class TrendMetric implements Serializable { + + String metricName; + String appId; + String hostname; + + public TrendMetric(String metricName, String appId, String hostname) { + this.metricName = metricName; + this.appId = appId; + this.hostname = hostname; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java new file mode 100644 index 0000000..eb19857 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.common; + +import java.util.Arrays; + +public class DataSeries { + + public String seriesName; + public double[] ts; + public double[] values; + + public DataSeries(String seriesName, double[] ts, double[] values) { + this.seriesName = seriesName; + this.ts = ts; + this.values = values; + } + + @Override + public String toString() { + return seriesName + Arrays.toString(ts) + Arrays.toString(values); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java new file mode 100644 index 0000000..101b0e9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.common; + + +import java.util.ArrayList; +import java.util.List; + +public class ResultSet { + + public List<double[]> resultset = new ArrayList<>(); + + public ResultSet(List<double[]> resultset) { + this.resultset = resultset; + } + + public void print() { + System.out.println("Result : "); + if (!resultset.isEmpty()) { + for (int i = 0; i<resultset.get(0).length;i++) { + for (double[] entity : resultset) { + System.out.print(entity[i] + " "); + } + System.out.println(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java new file mode 100644 index 0000000..4ea4ac5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.common; + + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +public class StatisticUtils { + + public static double mean(double[] values) { + double sum = 0; + for (double d : values) { + sum += d; + } + return sum / values.length; + } + + public static double variance(double[] values) { + double avg = mean(values); + double variance = 0; + for (double d : values) { + variance += Math.pow(d - avg, 2.0); + } + return variance; + } + + public static double sdev(double[] values, boolean useBesselsCorrection) { + double variance = variance(values); + int n = (useBesselsCorrection) ? values.length - 1 : values.length; + return Math.sqrt(variance / n); + } + + public static double median(double[] values) { + double[] clonedValues = Arrays.copyOf(values, values.length); + Arrays.sort(clonedValues); + int n = values.length; + + if (n % 2 != 0) { + return clonedValues[(n-1)/2]; + } else { + return ( clonedValues[(n-1)/2] + clonedValues[n/2] ) / 2; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java new file mode 100644 index 0000000..0b10b4b --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.methods; + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; + +import java.sql.Time; +import java.util.List; +import java.util.Map; + +public abstract class AnomalyDetectionTechnique { + + protected String methodType; + + public abstract List<MetricAnomaly> test(TimelineMetric metric); + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java new file mode 100644 index 0000000..da4f030 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.methods; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +public class MetricAnomaly implements Serializable{ + + private String methodType; + private double anomalyScore; + private String metricKey; + private long timestamp; + private double metricValue; + + + public MetricAnomaly(String metricKey, long timestamp, double metricValue, String methodType, double anomalyScore) { + this.metricKey = metricKey; + this.timestamp = timestamp; + this.metricValue = metricValue; + this.methodType = methodType; + this.anomalyScore = anomalyScore; + + } + + public String getMethodType() { + return methodType; + } + + public void setMethodType(String methodType) { + this.methodType = methodType; + } + + public double getAnomalyScore() { + return anomalyScore; + } + + public void setAnomalyScore(double anomalyScore) { + this.anomalyScore = anomalyScore; + } + + public void setMetricKey(String metricKey) { + this.metricKey = metricKey; + } + + public String getMetricKey() { + return metricKey; + } + + public void setMetricName(String metricName) { + this.metricKey = metricName; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public double getMetricValue() { + return metricValue; + } + + public void setMetricValue(double metricValue) { + this.metricValue = metricValue; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java new file mode 100644 index 0000000..5e1f76b --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.methods.ema; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; + +@XmlRootElement +public class EmaModel implements Serializable { + + private String metricName; + private String hostname; + private String appId; + private double ema; + private double ems; + private double weight; + private double timessdev; + + private int ctr = 0; + private static final int suppressAnomaliesTheshold = 30; + + private static final Log LOG = LogFactory.getLog(EmaModel.class); + + public EmaModel(String name, String hostname, String appId, double weight, double timessdev) { + this.metricName = name; + this.hostname = hostname; + this.appId = appId; + this.weight = weight; + this.timessdev = timessdev; + this.ema = 0.0; + this.ems = 0.0; + } + + public String getMetricName() { + return metricName; + } + + public String getHostname() { + return hostname; + } + + public String getAppId() { + return appId; + } + + public double testAndUpdate(double metricValue) { + + double anomalyScore = 0.0; + if (ctr > suppressAnomaliesTheshold) { + anomalyScore = test(metricValue); + } + if (Math.abs(anomalyScore) < 2 * timessdev) { + update(metricValue); + } else { + LOG.info("Not updating model for this value"); + } + ctr++; + LOG.info("Counter : " + ctr); + LOG.info("Anomaly Score for " + metricValue + " : " + anomalyScore); + return anomalyScore; + } + + public void update(double metricValue) { + ema = weight * ema + (1 - weight) * metricValue; + ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0)); + LOG.info("In update : ema = " + ema + ", ems = " + ems); + } + + public double test(double metricValue) { + LOG.info("In test : ema = " + ema + ", ems = " + ems); + double diff = Math.abs(ema - metricValue) - (timessdev * ems); + LOG.info("diff = " + diff); + if (diff > 0) { + return Math.abs((metricValue - ema) / ems); //Z score + } else { + return 0.0; + } + } + + public void updateModel(boolean increaseSensitivity, double percent) { + LOG.info("Updating model for " + metricName + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent); + double delta = percent / 100; + if (increaseSensitivity) { + delta = delta * -1; + } + this.timessdev = timessdev + delta * timessdev; + this.weight = Math.min(1.0, weight + delta * weight); + LOG.info("New model parameters " + metricName + " : timessdev = " + timessdev + ", weight = " + weight); + } + + public double getWeight() { + return weight; + } + + public void setWeight(double weight) { + this.weight = weight; + } + + public double getTimessdev() { + return timessdev; + } + + public void setTimessdev(double timessdev) { + this.timessdev = timessdev; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java new file mode 100644 index 0000000..62749c1 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.methods.ema; + +import com.google.gson.Gson; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.spark.SparkContext; +import org.apache.spark.mllib.util.Loader; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; + +public class EmaModelLoader implements Loader<EmaTechnique> { + private static final Log LOG = LogFactory.getLog(EmaModelLoader.class); + + @Override + public EmaTechnique load(SparkContext sc, String path) { + return new EmaTechnique(0.5,3); +// Gson gson = new Gson(); +// try { +// String fileString = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8); +// return gson.fromJson(fileString, EmaTechnique.class); +// } catch (IOException e) { +// LOG.error(e); +// } +// return null; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java new file mode 100644 index 0000000..c005e6f --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.methods.ema; + +import com.google.gson.Gson; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; +import org.apache.ambari.metrics.alertservice.prototype.methods.AnomalyDetectionTechnique; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.spark.SparkContext; +import org.apache.spark.mllib.util.Saveable; + +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.BufferedWriter; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.io.Writer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@XmlRootElement +public class EmaTechnique extends AnomalyDetectionTechnique implements Serializable, Saveable { + + @XmlElement(name = "trackedEmas") + private Map<String, EmaModel> trackedEmas; + private static final Log LOG = LogFactory.getLog(EmaTechnique.class); + + private double startingWeight = 0.5; + private double startTimesSdev = 3.0; + private String methodType = "ema"; + + public EmaTechnique(double startingWeight, double startTimesSdev) { + trackedEmas = new HashMap<>(); + this.startingWeight = startingWeight; + this.startTimesSdev = startTimesSdev; + LOG.info("New EmaTechnique......"); + } + + public List<MetricAnomaly> test(TimelineMetric metric) { + String metricName = metric.getMetricName(); + String appId = metric.getAppId(); + String hostname = metric.getHostName(); + String key = metricName + "_" + appId + "_" + hostname; + + EmaModel emaModel = trackedEmas.get(key); + if (emaModel == null) { + LOG.info("EmaModel not present for " + key); + LOG.info("Number of tracked Emas : " + trackedEmas.size()); + emaModel = new EmaModel(metricName, hostname, appId, startingWeight, startTimesSdev); + trackedEmas.put(key, emaModel); + } else { + LOG.info("EmaModel already present for " + key); + } + + List<MetricAnomaly> anomalies = new ArrayList<>(); + + for (Long timestamp : metric.getMetricValues().keySet()) { + double metricValue = metric.getMetricValues().get(timestamp); + double anomalyScore = emaModel.testAndUpdate(metricValue); + if (anomalyScore > 0.0) { + LOG.info("Found anomaly for : " + key); + MetricAnomaly metricAnomaly = new MetricAnomaly(key, timestamp, metricValue, methodType, anomalyScore); + anomalies.add(metricAnomaly); + } else { + LOG.info("Discarding non-anomaly for : " + key); + } + } + return anomalies; + } + + public boolean updateModel(TimelineMetric timelineMetric, boolean increaseSensitivity, double percent) { + String metricName = timelineMetric.getMetricName(); + String appId = timelineMetric.getAppId(); + String hostname = timelineMetric.getHostName(); + String key = metricName + "_" + appId + "_" + hostname; + + + EmaModel emaModel = trackedEmas.get(key); + + if (emaModel == null) { + LOG.warn("EMA Model for " + key + " not found"); + return false; + } + emaModel.updateModel(increaseSensitivity, percent); + + return true; + } + + @Override + public void save(SparkContext sc, String path) { + Gson gson = new Gson(); + try { + String json = gson.toJson(this); + try (Writer writer = new BufferedWriter(new OutputStreamWriter( + new FileOutputStream(path), "utf-8"))) { + writer.write(json); + } + } catch (IOException e) { + LOG.error(e); + } + } + + @Override + public String formatVersion() { + return "1.0"; + } + + public Map<String, EmaModel> getTrackedEmas() { + return trackedEmas; + } + + public double getStartingWeight() { + return startingWeight; + } + + public double getStartTimesSdev() { + return startTimesSdev; + } + +} + http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java new file mode 100644 index 0000000..50bf9f2 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.methods.hsdev; + +import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import static org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.median; +import static org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.sdev; + +import java.io.Serializable; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +public class HsdevTechnique implements Serializable { + + private Map<String, Double> hsdevMap; + private String methodType = "hsdev"; + private static final Log LOG = LogFactory.getLog(HsdevTechnique.class); + + public HsdevTechnique() { + hsdevMap = new HashMap<>(); + } + + public MetricAnomaly runHsdevTest(String key, DataSeries trainData, DataSeries testData) { + int testLength = testData.values.length; + int trainLength = trainData.values.length; + + if (trainLength < testLength) { + LOG.info("Not enough train data."); + return null; + } + + if (!hsdevMap.containsKey(key)) { + hsdevMap.put(key, 3.0); + } + + double n = hsdevMap.get(key); + + double historicSd = sdev(trainData.values, false); + double historicMedian = median(trainData.values); + double currentMedian = median(testData.values); + + double diff = Math.abs(currentMedian - historicMedian); + LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1])); + LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd); + + if (diff > n * historicSd) { + double zScore = diff / historicSd; + LOG.info("Z Score of current series : " + zScore); + return new MetricAnomaly(key, + (long) testData.ts[testLength - 1], + testData.values[testLength - 1], + methodType, + zScore); + } + return null; + } + +}
