http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java deleted file mode 100644 index 706c69f..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java +++ /dev/null @@ -1,239 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.core; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; -import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.broadcast.Broadcast; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.kafka.KafkaUtils; -import scala.Tuple2; - -import java.util.*; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class MetricSparkConsumer { - - private static final Log LOG = LogFactory.getLog(MetricSparkConsumer.class); - private static String groupId = "ambari-metrics-group"; - private static String topicName = "ambari-metrics-topic"; - private static int numThreads = 1; - private static long pitStartTime = System.currentTimeMillis(); - private static long ksStartTime = pitStartTime; - private static long hdevStartTime = ksStartTime; - private static Set<Pattern> includeMetricPatterns = new HashSet<>(); - private static Set<String> includedHosts = new HashSet<>(); - private static Set<TrendMetric> trendMetrics = new HashSet<>(); - - public MetricSparkConsumer() { - } - - public static Properties readProperties(String propertiesFile) { - try { - Properties properties = new Properties(); - InputStream inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile); - if (inputStream == null) { - inputStream = new FileInputStream(propertiesFile); - } - properties.load(inputStream); - return properties; - } catch (IOException ioEx) { - LOG.error("Error reading properties file for jmeter"); - return null; - } - } - - public static void main(String[] args) throws InterruptedException { - - if (args.length < 1) { - System.err.println("Usage: MetricSparkConsumer <input-config-file>"); - System.exit(1); - } - - Properties properties = readProperties(args[0]); - - List<String> appIds = Arrays.asList(properties.getProperty("appIds").split(",")); - - String collectorHost = properties.getProperty("collectorHost"); - String collectorPort = properties.getProperty("collectorPort"); - String collectorProtocol = properties.getProperty("collectorProtocol"); - - String zkQuorum = properties.getProperty("zkQuorum"); - - double emaW = Double.parseDouble(properties.getProperty("emaW")); - double emaN = Double.parseDouble(properties.getProperty("emaN")); - int emaThreshold = Integer.parseInt(properties.getProperty("emaThreshold")); - double tukeysN = Double.parseDouble(properties.getProperty("tukeysN")); - - long pitTestInterval = Long.parseLong(properties.getProperty("pointInTimeTestInterval")); - long pitTrainInterval = Long.parseLong(properties.getProperty("pointInTimeTrainInterval")); - - long ksTestInterval = Long.parseLong(properties.getProperty("ksTestInterval")); - long ksTrainInterval = Long.parseLong(properties.getProperty("ksTrainInterval")); - int hsdevNhp = Integer.parseInt(properties.getProperty("hsdevNhp")); - long hsdevInterval = Long.parseLong(properties.getProperty("hsdevInterval")); - - String ambariServerHost = properties.getProperty("ambariServerHost"); - String clusterName = properties.getProperty("clusterName"); - - String includeMetricPatternStrings = properties.getProperty("includeMetricPatterns"); - if (includeMetricPatternStrings != null && !includeMetricPatternStrings.isEmpty()) { - String[] patterns = includeMetricPatternStrings.split(","); - for (String p : patterns) { - LOG.info("Included Pattern : " + p); - includeMetricPatterns.add(Pattern.compile(p)); - } - } - - String includedHostList = properties.getProperty("hosts"); - if (includedHostList != null && !includedHostList.isEmpty()) { - String[] hosts = includedHostList.split(","); - includedHosts.addAll(Arrays.asList(hosts)); - } - - MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort); - - SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector"); - - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); - - EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN, emaThreshold); - PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(metricsCollectorInterface, - tukeysN, - pitTestInterval, - pitTrainInterval, - ambariServerHost, - clusterName); - - TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface, - ksTestInterval, - ksTrainInterval, - hsdevNhp); - - Broadcast<EmaTechnique> emaTechniqueBroadcast = jssc.sparkContext().broadcast(emaTechnique); - Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = jssc.sparkContext().broadcast(pointInTimeADSystem); - Broadcast<TrendADSystem> trendADSystemBroadcast = jssc.sparkContext().broadcast(trendADSystem); - Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = jssc.sparkContext().broadcast(metricsCollectorInterface); - Broadcast<Set<Pattern>> includePatternBroadcast = jssc.sparkContext().broadcast(includeMetricPatterns); - Broadcast<Set<String>> includedHostBroadcast = jssc.sparkContext().broadcast(includedHosts); - - JavaPairReceiverInputDStream<String, String> messages = - KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads)); - - //Convert JSON string to TimelineMetrics. - JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() { - @Override - public TimelineMetrics call(Tuple2<String, String> message) throws Exception { - ObjectMapper mapper = new ObjectMapper(); - TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class); - return metrics; - } - }); - - timelineMetricsStream.print(); - - //Group TimelineMetric by AppId. - JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair( - timelineMetrics -> timelineMetrics.getMetrics().isEmpty() ? new Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), timelineMetrics) - ); - - appMetricStream.print(); - - //Filter AppIds that are not needed. - JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() { - @Override - public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception { - return appIds.contains(appMetricTuple._1); - } - }); - - filteredAppMetricStream.print(); - - filteredAppMetricStream.foreachRDD(rdd -> { - rdd.foreach( - tuple2 -> { - long currentTime = System.currentTimeMillis(); - EmaTechnique ema = emaTechniqueBroadcast.getValue(); - if (currentTime > pitStartTime + pitTestInterval) { - LOG.info("Running Tukeys...."); - pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, currentTime); - pitStartTime = pitStartTime + pitTestInterval; - } - - if (currentTime > ksStartTime + ksTestInterval) { - LOG.info("Running KS Test...."); - trendADSystemBroadcast.getValue().runKSTest(currentTime, trendMetrics); - ksStartTime = ksStartTime + ksTestInterval; - } - - if (currentTime > hdevStartTime + hsdevInterval) { - LOG.info("Running HSdev Test...."); - trendADSystemBroadcast.getValue().runHsdevMethod(); - hdevStartTime = hdevStartTime + hsdevInterval; - } - - TimelineMetrics metrics = tuple2._2(); - for (TimelineMetric timelineMetric : metrics.getMetrics()) { - - boolean includeHost = includedHostBroadcast.getValue().contains(timelineMetric.getHostName()); - boolean includeMetric = false; - if (includeHost) { - if (includePatternBroadcast.getValue().isEmpty()) { - includeMetric = true; - } - for (Pattern p : includePatternBroadcast.getValue()) { - Matcher m = p.matcher(timelineMetric.getMetricName()); - if (m.find()) { - includeMetric = true; - } - } - } - - if (includeMetric) { - trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), - timelineMetric.getHostName())); - List<MetricAnomaly> anomalies = ema.test(timelineMetric); - metricsCollectorInterfaceBroadcast.getValue().publish(anomalies); - } - } - }); - }); - - jssc.start(); - jssc.awaitTermination(); - } -} - - - -
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java deleted file mode 100644 index 246565d..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java +++ /dev/null @@ -1,237 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.core; - -import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.codehaus.jackson.map.AnnotationIntrospector; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.ObjectReader; -import org.codehaus.jackson.map.annotate.JsonSerialize; -import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.Serializable; -import java.net.HttpURLConnection; -import java.net.InetAddress; -import java.net.URL; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.TreeMap; - -public class MetricsCollectorInterface implements Serializable { - - private static String hostName = null; - private String instanceId = null; - public final static String serviceName = "anomaly-engine"; - private String collectorHost; - private String protocol; - private String port; - private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics"; - private static final Log LOG = LogFactory.getLog(MetricsCollectorInterface.class); - private static ObjectMapper mapper; - private final static ObjectReader timelineObjectReader; - - static { - mapper = new ObjectMapper(); - AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); - mapper.setAnnotationIntrospector(introspector); - mapper.getSerializationConfig() - .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); - timelineObjectReader = mapper.reader(TimelineMetrics.class); - } - - public MetricsCollectorInterface(String collectorHost, String protocol, String port) { - this.collectorHost = collectorHost; - this.protocol = protocol; - this.port = port; - this.hostName = getDefaultLocalHostName(); - } - - public static String getDefaultLocalHostName() { - - if (hostName != null) { - return hostName; - } - - try { - return InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException e) { - LOG.info("Error getting host address"); - } - return null; - } - - public void publish(List<MetricAnomaly> metricAnomalies) { - if (CollectionUtils.isNotEmpty(metricAnomalies)) { - LOG.info("Sending metric anomalies of size : " + metricAnomalies.size()); - List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies); - if (!metricList.isEmpty()) { - TimelineMetrics timelineMetrics = new TimelineMetrics(); - timelineMetrics.setMetrics(metricList); - emitMetrics(timelineMetrics); - } - } else { - LOG.debug("No anomalies to send."); - } - } - - private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) { - List<TimelineMetric> metrics = new ArrayList<>(); - - if (metricAnomalies.isEmpty()) { - return metrics; - } - - for (MetricAnomaly anomaly : metricAnomalies) { - TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(anomaly.getMetricKey()); - timelineMetric.setAppId(serviceName + "-" + anomaly.getMethodType()); - timelineMetric.setInstanceId(null); - timelineMetric.setHostName(getDefaultLocalHostName()); - timelineMetric.setStartTime(anomaly.getTimestamp()); - HashMap<String, String> metadata = new HashMap<>(); - metadata.put("method", anomaly.getMethodType()); - metadata.put("anomaly-score", String.valueOf(anomaly.getAnomalyScore())); - timelineMetric.setMetadata(metadata); - TreeMap<Long,Double> metricValues = new TreeMap<>(); - metricValues.put(anomaly.getTimestamp(), anomaly.getMetricValue()); - timelineMetric.setMetricValues(metricValues); - - metrics.add(timelineMetric); - } - return metrics; - } - - public boolean emitMetrics(TimelineMetrics metrics) { - String connectUrl = constructTimelineMetricUri(); - String jsonData = null; - LOG.debug("EmitMetrics connectUrl = " + connectUrl); - try { - jsonData = mapper.writeValueAsString(metrics); - LOG.info(jsonData); - } catch (IOException e) { - LOG.error("Unable to parse metrics", e); - } - if (jsonData != null) { - return emitMetricsJson(connectUrl, jsonData); - } - return false; - } - - private HttpURLConnection getConnection(String spec) throws IOException { - return (HttpURLConnection) new URL(spec).openConnection(); - } - - private boolean emitMetricsJson(String connectUrl, String jsonData) { - int timeout = 10000; - HttpURLConnection connection = null; - try { - if (connectUrl == null) { - throw new IOException("Unknown URL. Unable to connect to metrics collector."); - } - connection = getConnection(connectUrl); - - connection.setRequestMethod("POST"); - connection.setRequestProperty("Content-Type", "application/json"); - connection.setRequestProperty("Connection", "Keep-Alive"); - connection.setConnectTimeout(timeout); - connection.setReadTimeout(timeout); - connection.setDoOutput(true); - - if (jsonData != null) { - try (OutputStream os = connection.getOutputStream()) { - os.write(jsonData.getBytes("UTF-8")); - } - } - - int statusCode = connection.getResponseCode(); - - if (statusCode != 200) { - LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " + - "statusCode = " + statusCode); - } else { - LOG.info("Metrics posted to Collector " + connectUrl); - } - return true; - } catch (IOException ioe) { - LOG.error(ioe.getMessage()); - } - return false; - } - - private String constructTimelineMetricUri() { - StringBuilder sb = new StringBuilder(protocol); - sb.append("://"); - sb.append(collectorHost); - sb.append(":"); - sb.append(port); - sb.append(WS_V1_TIMELINE_METRICS); - return sb.toString(); - } - - public TimelineMetrics fetchMetrics(String metricName, - String appId, - String hostname, - long startime, - long endtime) { - - String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId + - "&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime; - LOG.debug("Fetch metrics URL : " + url); - - URL obj = null; - BufferedReader in = null; - TimelineMetrics timelineMetrics = new TimelineMetrics(); - - try { - obj = new URL(url); - HttpURLConnection con = (HttpURLConnection) obj.openConnection(); - con.setRequestMethod("GET"); - int responseCode = con.getResponseCode(); - LOG.debug("Sending 'GET' request to URL : " + url); - LOG.debug("Response Code : " + responseCode); - - in = new BufferedReader( - new InputStreamReader(con.getInputStream())); - timelineMetrics = timelineObjectReader.readValue(in); - } catch (Exception e) { - LOG.error(e); - } finally { - if (in != null) { - try { - in.close(); - } catch (IOException e) { - LOG.warn(e); - } - } - } - - LOG.info("Fetched " + timelineMetrics.getMetrics().size() + " metrics."); - return timelineMetrics; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java deleted file mode 100644 index c579515..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java +++ /dev/null @@ -1,260 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.core; - -import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; -import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet; -import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaModel; -import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -public class PointInTimeADSystem implements Serializable { - - //private EmaTechnique emaTechnique; - private MetricsCollectorInterface metricsCollectorInterface; - private Map<String, Double> tukeysNMap; - private double defaultTukeysN = 3; - - private long testIntervalMillis = 5*60*1000; //10mins - private long trainIntervalMillis = 15*60*1000; //1hour - - private static final Log LOG = LogFactory.getLog(PointInTimeADSystem.class); - - private AmbariServerInterface ambariServerInterface; - private int sensitivity = 50; - private int minSensitivity = 0; - private int maxSensitivity = 100; - - public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN, - long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) { - this.metricsCollectorInterface = metricsCollectorInterface; - this.defaultTukeysN = defaultTukeysN; - this.tukeysNMap = new HashMap<>(); - this.testIntervalMillis = testIntervalMillis; - this.trainIntervalMillis = trainIntervalMillis; - this.ambariServerInterface = new AmbariServerInterface(ambariServerHost, clusterName); - LOG.info("Starting PointInTimeADSystem..."); - } - - public void runTukeysAndRefineEma(EmaTechnique emaTechnique, long startTime) { - LOG.info("Running Tukeys for test data interval [" + new Date(startTime - testIntervalMillis) + " : " + new Date(startTime) + "], with train data period [" + new Date(startTime - testIntervalMillis - trainIntervalMillis) + " : " + new Date(startTime - testIntervalMillis) + "]"); - - int requiredSensivity = ambariServerInterface.getPointInTimeSensitivity(); - if (requiredSensivity == -1 || requiredSensivity == sensitivity) { - LOG.info("No change in sensitivity needed."); - } else { - LOG.info("Current tukey's N value = " + defaultTukeysN); - if (requiredSensivity > sensitivity) { - int targetSensitivity = Math.min(maxSensitivity, requiredSensivity); - while (sensitivity < targetSensitivity) { - defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.05; - sensitivity++; - } - } else { - int targetSensitivity = Math.max(minSensitivity, requiredSensivity); - while (sensitivity > targetSensitivity) { - defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.05; - sensitivity--; - } - } - LOG.info("New tukey's N value = " + defaultTukeysN); - } - - TimelineMetrics timelineMetrics = new TimelineMetrics(); - for (String metricKey : emaTechnique.getTrackedEmas().keySet()) { - LOG.info("EMA key = " + metricKey); - EmaModel emaModel = emaTechnique.getTrackedEmas().get(metricKey); - String metricName = emaModel.getMetricName(); - String appId = emaModel.getAppId(); - String hostname = emaModel.getHostname(); - - TimelineMetrics tukeysData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, startTime - (testIntervalMillis + trainIntervalMillis), - startTime); - - if (tukeysData.getMetrics().isEmpty()) { - LOG.info("No metrics fetched for Tukeys, metricKey = " + metricKey); - continue; - } - - List<Double> trainTsList = new ArrayList<>(); - List<Double> trainDataList = new ArrayList<>(); - List<Double> testTsList = new ArrayList<>(); - List<Double> testDataList = new ArrayList<>(); - - for (TimelineMetric metric : tukeysData.getMetrics()) { - for (Long timestamp : metric.getMetricValues().keySet()) { - if (timestamp <= (startTime - testIntervalMillis)) { - trainDataList.add(metric.getMetricValues().get(timestamp)); - trainTsList.add((double)timestamp); - } else { - testDataList.add(metric.getMetricValues().get(timestamp)); - testTsList.add((double)timestamp); - } - } - } - - if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { - LOG.info("Not enough train/test data to perform analysis."); - continue; - } - - String tukeysTrainSeries = "tukeysTrainSeries"; - double[] trainTs = new double[trainTsList.size()]; - double[] trainData = new double[trainTsList.size()]; - for (int i = 0; i < trainTs.length; i++) { - trainTs[i] = trainTsList.get(i); - trainData[i] = trainDataList.get(i); - } - - String tukeysTestSeries = "tukeysTestSeries"; - double[] testTs = new double[testTsList.size()]; - double[] testData = new double[testTsList.size()]; - for (int i = 0; i < testTs.length; i++) { - testTs[i] = testTsList.get(i); - testData[i] = testDataList.get(i); - } - - LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length); - - DataSeries tukeysTrainData = new DataSeries(tukeysTrainSeries, trainTs, trainData); - DataSeries tukeysTestData = new DataSeries(tukeysTestSeries, testTs, testData); - - if (!tukeysNMap.containsKey(metricKey)) { - tukeysNMap.put(metricKey, defaultTukeysN); - } - - Map<String, String> configs = new HashMap<>(); - configs.put("tukeys.n", String.valueOf(tukeysNMap.get(metricKey))); - - ResultSet rs = RFunctionInvoker.tukeys(tukeysTrainData, tukeysTestData, configs); - - List<TimelineMetric> tukeysMetrics = getAsTimelineMetric(rs, metricName, appId, hostname); - LOG.info("Tukeys anomalies size : " + tukeysMetrics.size()); - TreeMap<Long, Double> tukeysMetricValues = new TreeMap<>(); - - for (TimelineMetric tukeysMetric : tukeysMetrics) { - tukeysMetricValues.putAll(tukeysMetric.getMetricValues()); - timelineMetrics.addOrMergeTimelineMetric(tukeysMetric); - } - - TimelineMetrics emaData = metricsCollectorInterface.fetchMetrics(metricKey, MetricsCollectorInterface.serviceName+"-ema", MetricsCollectorInterface.getDefaultLocalHostName(), startTime - testIntervalMillis, startTime); - TreeMap<Long, Double> emaMetricValues = new TreeMap(); - if (!emaData.getMetrics().isEmpty()) { - emaMetricValues = emaData.getMetrics().get(0).getMetricValues(); - } - - LOG.info("Ema anomalies size : " + emaMetricValues.size()); - int tp = 0; - int tn = 0; - int fp = 0; - int fn = 0; - - for (double ts : testTs) { - long timestamp = (long) ts; - if (tukeysMetricValues.containsKey(timestamp)) { - if (emaMetricValues.containsKey(timestamp)) { - tp++; - } else { - fn++; - } - } else { - if (emaMetricValues.containsKey(timestamp)) { - fp++; - } else { - tn++; - } - } - } - - double recall = (double) tp / (double) (tp + fn); - double precision = (double) tp / (double) (tp + fp); - LOG.info("----------------------------"); - LOG.info("Precision Recall values for " + metricKey); - LOG.info("tp=" + tp + ", fp=" + fp + ", tn=" + tn + ", fn=" + fn); - LOG.info("----------------------------"); - - if (recall < 0.5) { - LOG.info("Increasing EMA sensitivity by 10%"); - emaModel.updateModel(true, 5); - } else if (precision < 0.5) { - LOG.info("Decreasing EMA sensitivity by 10%"); - emaModel.updateModel(false, 5); - } - - } - - if (emaTechnique.getTrackedEmas().isEmpty()){ - LOG.info("No EMA Technique keys tracked!!!!"); - } - - if (!timelineMetrics.getMetrics().isEmpty()) { - metricsCollectorInterface.emitMetrics(timelineMetrics); - } - } - - private static List<TimelineMetric> getAsTimelineMetric(ResultSet result, String metricName, String appId, String hostname) { - - List<TimelineMetric> timelineMetrics = new ArrayList<>(); - - if (result == null) { - LOG.info("ResultSet from R call is null!!"); - return null; - } - - if (result.resultset.size() > 0) { - double[] ts = result.resultset.get(0); - double[] metrics = result.resultset.get(1); - double[] anomalyScore = result.resultset.get(2); - for (int i = 0; i < ts.length; i++) { - TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(metricName + ":" + appId + ":" + hostname); - timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName()); - timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys"); - timelineMetric.setInstanceId(null); - timelineMetric.setStartTime((long) ts[i]); - TreeMap<Long, Double> metricValues = new TreeMap<>(); - metricValues.put((long) ts[i], metrics[i]); - - HashMap<String, String> metadata = new HashMap<>(); - metadata.put("method", "tukeys"); - if (String.valueOf(anomalyScore[i]).equals("infinity")) { - LOG.info("Got anomalyScore = infinity for " + metricName + ":" + appId + ":" + hostname); - } else { - metadata.put("anomaly-score", String.valueOf(anomalyScore[i])); - } - timelineMetric.setMetadata(metadata); - - timelineMetric.setMetricValues(metricValues); - timelineMetrics.add(timelineMetric); - } - } - - return timelineMetrics; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java deleted file mode 100644 index 4538f0b..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.core; - - -import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet; -import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.rosuda.JRI.REXP; -import org.rosuda.JRI.RVector; -import org.rosuda.JRI.Rengine; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class RFunctionInvoker { - - static final Log LOG = LogFactory.getLog(RFunctionInvoker.class); - public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null); - private static String rScriptDir = "/usr/lib/ambari-metrics-collector/R-scripts"; - - private static void loadDataSets(Rengine r, DataSeries trainData, DataSeries testData) { - r.assign("train_ts", trainData.ts); - r.assign("train_x", trainData.values); - r.eval("train_data <- data.frame(train_ts,train_x)"); - r.eval("names(train_data) <- c(\"TS\", " + trainData.seriesName + ")"); - - r.assign("test_ts", testData.ts); - r.assign("test_x", testData.values); - r.eval("test_data <- data.frame(test_ts,test_x)"); - r.eval("names(test_data) <- c(\"TS\", " + testData.seriesName + ")"); - } - - public static void setScriptsDir(String dir) { - rScriptDir = dir; - } - - public static ResultSet executeMethod(String methodType, DataSeries trainData, DataSeries testData, Map<String, String> configs) { - - ResultSet result; - switch (methodType) { - case "tukeys": - result = tukeys(trainData, testData, configs); - break; - case "ema": - result = ema_global(trainData, testData, configs); - break; - case "ks": - result = ksTest(trainData, testData, configs); - break; - case "hsdev": - result = hsdev(trainData, testData, configs); - break; - default: - result = tukeys(trainData, testData, configs); - break; - } - return result; - } - - public static ResultSet tukeys(DataSeries trainData, DataSeries testData, Map<String, String> configs) { - try { - - REXP exp1 = r.eval("source('" + rScriptDir + "/tukeys.r" + "')"); - - double n = Double.parseDouble(configs.get("tukeys.n")); - r.eval("n <- " + n); - - loadDataSets(r, trainData, testData); - - r.eval("an <- ams_tukeys(train_data, test_data, n)"); - REXP exp = r.eval("an"); - RVector cont = (RVector) exp.getContent(); - List<double[]> result = new ArrayList(); - for (int i = 0; i < cont.size(); i++) { - result.add(cont.at(i).asDoubleArray()); - } - return new ResultSet(result); - } catch (Exception e) { - LOG.error(e); - } finally { - r.end(); - } - return null; - } - - public static ResultSet ema_global(DataSeries trainData, DataSeries testData, Map<String, String> configs) { - try { - r.eval("source('" + rScriptDir + "/ema.r" + "')"); - - int n = Integer.parseInt(configs.get("ema.n")); - r.eval("n <- " + n); - - double w = Double.parseDouble(configs.get("ema.w")); - r.eval("w <- " + w); - - loadDataSets(r, trainData, testData); - - r.eval("an <- ema_global(train_data, test_data, w, n)"); - REXP exp = r.eval("an"); - RVector cont = (RVector) exp.getContent(); - List<double[]> result = new ArrayList(); - for (int i = 0; i < cont.size(); i++) { - result.add(cont.at(i).asDoubleArray()); - } - return new ResultSet(result); - - } catch (Exception e) { - LOG.error(e); - } finally { - r.end(); - } - return null; - } - - public static ResultSet ema_daily(DataSeries trainData, DataSeries testData, Map<String, String> configs) { - try { - r.eval("source('" + rScriptDir + "/ema.r" + "')"); - - int n = Integer.parseInt(configs.get("ema.n")); - r.eval("n <- " + n); - - double w = Double.parseDouble(configs.get("ema.w")); - r.eval("w <- " + w); - - loadDataSets(r, trainData, testData); - - r.eval("an <- ema_daily(train_data, test_data, w, n)"); - REXP exp = r.eval("an"); - RVector cont = (RVector) exp.getContent(); - List<double[]> result = new ArrayList(); - for (int i = 0; i < cont.size(); i++) { - result.add(cont.at(i).asDoubleArray()); - } - return new ResultSet(result); - - } catch (Exception e) { - LOG.error(e); - } finally { - r.end(); - } - return null; - } - - public static ResultSet ksTest(DataSeries trainData, DataSeries testData, Map<String, String> configs) { - try { - r.eval("source('" + rScriptDir + "/kstest.r" + "')"); - - double p_value = Double.parseDouble(configs.get("ks.p_value")); - r.eval("p_value <- " + p_value); - - loadDataSets(r, trainData, testData); - - r.eval("an <- ams_ks(train_data, test_data, p_value)"); - REXP exp = r.eval("an"); - RVector cont = (RVector) exp.getContent(); - List<double[]> result = new ArrayList(); - for (int i = 0; i < cont.size(); i++) { - result.add(cont.at(i).asDoubleArray()); - } - return new ResultSet(result); - - } catch (Exception e) { - LOG.error(e); - } finally { - r.end(); - } - return null; - } - - public static ResultSet hsdev(DataSeries trainData, DataSeries testData, Map<String, String> configs) { - try { - r.eval("source('" + rScriptDir + "/hsdev.r" + "')"); - - int n = Integer.parseInt(configs.get("hsdev.n")); - r.eval("n <- " + n); - - int nhp = Integer.parseInt(configs.get("hsdev.nhp")); - r.eval("nhp <- " + nhp); - - long interval = Long.parseLong(configs.get("hsdev.interval")); - r.eval("interval <- " + interval); - - long period = Long.parseLong(configs.get("hsdev.period")); - r.eval("period <- " + period); - - loadDataSets(r, trainData, testData); - - r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)"); - REXP exp = r.eval("an2"); - RVector cont = (RVector) exp.getContent(); - - List<double[]> result = new ArrayList(); - for (int i = 0; i < cont.size(); i++) { - result.add(cont.at(i).asDoubleArray()); - } - return new ResultSet(result); - } catch (Exception e) { - LOG.error(e); - } finally { - r.end(); - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java deleted file mode 100644 index 2a205d1..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java +++ /dev/null @@ -1,317 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.core; - -import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; -import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; -import org.apache.ambari.metrics.alertservice.prototype.methods.hsdev.HsdevTechnique; -import org.apache.ambari.metrics.alertservice.prototype.methods.kstest.KSTechnique; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; - -import java.io.BufferedReader; -import java.io.FileReader; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; - -public class TrendADSystem implements Serializable { - - private MetricsCollectorInterface metricsCollectorInterface; - private List<TrendMetric> trendMetrics; - - private long ksTestIntervalMillis = 10 * 60 * 1000; - private long ksTrainIntervalMillis = 10 * 60 * 1000; - private KSTechnique ksTechnique; - - private HsdevTechnique hsdevTechnique; - private int hsdevNumHistoricalPeriods = 3; - - private Map<KsSingleRunKey, MetricAnomaly> trackedKsAnomalies = new HashMap<>(); - private static final Log LOG = LogFactory.getLog(TrendADSystem.class); - private String inputFile = ""; - - public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface, - long ksTestIntervalMillis, - long ksTrainIntervalMillis, - int hsdevNumHistoricalPeriods) { - - this.metricsCollectorInterface = metricsCollectorInterface; - this.ksTestIntervalMillis = ksTestIntervalMillis; - this.ksTrainIntervalMillis = ksTrainIntervalMillis; - this.hsdevNumHistoricalPeriods = hsdevNumHistoricalPeriods; - - this.ksTechnique = new KSTechnique(); - this.hsdevTechnique = new HsdevTechnique(); - - trendMetrics = new ArrayList<>(); - } - - public void runKSTest(long currentEndTime, Set<TrendMetric> trendMetrics) { - readInputFile(inputFile); - - long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis; - LOG.info("Running KS Test for test data interval [" + new Date(ksTestIntervalStartTime) + " : " + - new Date(currentEndTime) + "], with train data period [" + new Date(ksTestIntervalStartTime - ksTrainIntervalMillis) - + " : " + new Date(ksTestIntervalStartTime) + "]"); - - for (TrendMetric metric : trendMetrics) { - String metricName = metric.metricName; - String appId = metric.appId; - String hostname = metric.hostname; - String key = metricName + ":" + appId + ":" + hostname; - - TimelineMetrics ksData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, ksTestIntervalStartTime - ksTrainIntervalMillis, - currentEndTime); - - if (ksData.getMetrics().isEmpty()) { - LOG.info("No metrics fetched for KS, metricKey = " + key); - continue; - } - - List<Double> trainTsList = new ArrayList<>(); - List<Double> trainDataList = new ArrayList<>(); - List<Double> testTsList = new ArrayList<>(); - List<Double> testDataList = new ArrayList<>(); - - for (TimelineMetric timelineMetric : ksData.getMetrics()) { - for (Long timestamp : timelineMetric.getMetricValues().keySet()) { - if (timestamp <= ksTestIntervalStartTime) { - trainDataList.add(timelineMetric.getMetricValues().get(timestamp)); - trainTsList.add((double) timestamp); - } else { - testDataList.add(timelineMetric.getMetricValues().get(timestamp)); - testTsList.add((double) timestamp); - } - } - } - - LOG.info("Train Data size : " + trainDataList.size() + ", Test Data Size : " + testDataList.size()); - if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { - LOG.info("Not enough train/test data to perform KS analysis."); - continue; - } - - String ksTrainSeries = "KSTrainSeries"; - double[] trainTs = new double[trainTsList.size()]; - double[] trainData = new double[trainTsList.size()]; - for (int i = 0; i < trainTs.length; i++) { - trainTs[i] = trainTsList.get(i); - trainData[i] = trainDataList.get(i); - } - - String ksTestSeries = "KSTestSeries"; - double[] testTs = new double[testTsList.size()]; - double[] testData = new double[testTsList.size()]; - for (int i = 0; i < testTs.length; i++) { - testTs[i] = testTsList.get(i); - testData[i] = testDataList.get(i); - } - - LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length); - - DataSeries ksTrainData = new DataSeries(ksTrainSeries, trainTs, trainData); - DataSeries ksTestData = new DataSeries(ksTestSeries, testTs, testData); - - MetricAnomaly metricAnomaly = ksTechnique.runKsTest(key, ksTrainData, ksTestData); - if (metricAnomaly == null) { - LOG.info("No anomaly from KS test."); - } else { - LOG.info("Found Anomaly in KS Test. Publishing KS Anomaly metric...."); - TimelineMetric timelineMetric = getAsTimelineMetric(metricAnomaly, - ksTestIntervalStartTime, currentEndTime, ksTestIntervalStartTime - ksTrainIntervalMillis, ksTestIntervalStartTime); - TimelineMetrics timelineMetrics = new TimelineMetrics(); - timelineMetrics.addOrMergeTimelineMetric(timelineMetric); - metricsCollectorInterface.emitMetrics(timelineMetrics); - - trackedKsAnomalies.put(new KsSingleRunKey(ksTestIntervalStartTime, currentEndTime, metricName, appId, hostname), metricAnomaly); - } - } - - if (trendMetrics.isEmpty()) { - LOG.info("No Trend metrics tracked!!!!"); - } - - } - - private TimelineMetric getAsTimelineMetric(MetricAnomaly metricAnomaly, - long testStart, - long testEnd, - long trainStart, - long trainEnd) { - - TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(metricAnomaly.getMetricKey()); - timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-" + metricAnomaly.getMethodType()); - timelineMetric.setInstanceId(null); - timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName()); - timelineMetric.setStartTime(testEnd); - HashMap<String, String> metadata = new HashMap<>(); - metadata.put("method", metricAnomaly.getMethodType()); - metadata.put("anomaly-score", String.valueOf(metricAnomaly.getAnomalyScore())); - metadata.put("test-start-time", String.valueOf(testStart)); - metadata.put("train-start-time", String.valueOf(trainStart)); - metadata.put("train-end-time", String.valueOf(trainEnd)); - timelineMetric.setMetadata(metadata); - TreeMap<Long,Double> metricValues = new TreeMap<>(); - metricValues.put(testEnd, metricAnomaly.getMetricValue()); - timelineMetric.setMetricValues(metricValues); - return timelineMetric; - - } - - public void runHsdevMethod() { - - List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>(); - - for (KsSingleRunKey ksSingleRunKey : trackedKsAnomalies.keySet()) { - - long hsdevTestEnd = ksSingleRunKey.endTime; - long hsdevTestStart = ksSingleRunKey.startTime; - - long period = hsdevTestEnd - hsdevTestStart; - - long hsdevTrainStart = hsdevTestStart - (hsdevNumHistoricalPeriods) * period; - long hsdevTrainEnd = hsdevTestStart; - - LOG.info("Running HSdev Test for test data interval [" + new Date(hsdevTestStart) + " : " + - new Date(hsdevTestEnd) + "], with train data period [" + new Date(hsdevTrainStart) - + " : " + new Date(hsdevTrainEnd) + "]"); - - String metricName = ksSingleRunKey.metricName; - String appId = ksSingleRunKey.appId; - String hostname = ksSingleRunKey.hostname; - String key = metricName + "_" + appId + "_" + hostname; - - TimelineMetrics hsdevData = metricsCollectorInterface.fetchMetrics( - metricName, - appId, - hostname, - hsdevTrainStart, - hsdevTestEnd); - - if (hsdevData.getMetrics().isEmpty()) { - LOG.info("No metrics fetched for HSDev, metricKey = " + key); - continue; - } - - List<Double> trainTsList = new ArrayList<>(); - List<Double> trainDataList = new ArrayList<>(); - List<Double> testTsList = new ArrayList<>(); - List<Double> testDataList = new ArrayList<>(); - - for (TimelineMetric timelineMetric : hsdevData.getMetrics()) { - for (Long timestamp : timelineMetric.getMetricValues().keySet()) { - if (timestamp <= hsdevTestStart) { - trainDataList.add(timelineMetric.getMetricValues().get(timestamp)); - trainTsList.add((double) timestamp); - } else { - testDataList.add(timelineMetric.getMetricValues().get(timestamp)); - testTsList.add((double) timestamp); - } - } - } - - if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { - LOG.info("Not enough train/test data to perform Hsdev analysis."); - continue; - } - - String hsdevTrainSeries = "HsdevTrainSeries"; - double[] trainTs = new double[trainTsList.size()]; - double[] trainData = new double[trainTsList.size()]; - for (int i = 0; i < trainTs.length; i++) { - trainTs[i] = trainTsList.get(i); - trainData[i] = trainDataList.get(i); - } - - String hsdevTestSeries = "HsdevTestSeries"; - double[] testTs = new double[testTsList.size()]; - double[] testData = new double[testTsList.size()]; - for (int i = 0; i < testTs.length; i++) { - testTs[i] = testTsList.get(i); - testData[i] = testDataList.get(i); - } - - LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length); - - DataSeries hsdevTrainData = new DataSeries(hsdevTrainSeries, trainTs, trainData); - DataSeries hsdevTestData = new DataSeries(hsdevTestSeries, testTs, testData); - - MetricAnomaly metricAnomaly = hsdevTechnique.runHsdevTest(key, hsdevTrainData, hsdevTestData); - if (metricAnomaly == null) { - LOG.info("No anomaly from Hsdev test. Mismatch between KS and HSDev. "); - ksTechnique.updateModel(key, false, 10); - } else { - LOG.info("Found Anomaly in Hsdev Test. This confirms KS anomaly."); - hsdevMetricAnomalies.add(getAsTimelineMetric(metricAnomaly, - hsdevTestStart, hsdevTestEnd, hsdevTrainStart, hsdevTrainEnd)); - } - } - clearTrackedKsRunKeys(); - - if (!hsdevMetricAnomalies.isEmpty()) { - LOG.info("Publishing Hsdev Anomalies...."); - TimelineMetrics timelineMetrics = new TimelineMetrics(); - timelineMetrics.setMetrics(hsdevMetricAnomalies); - metricsCollectorInterface.emitMetrics(timelineMetrics); - } - } - - private void clearTrackedKsRunKeys() { - trackedKsAnomalies.clear(); - } - - private void readInputFile(String fileName) { - trendMetrics.clear(); - try (BufferedReader br = new BufferedReader(new FileReader(fileName))) { - for (String line; (line = br.readLine()) != null; ) { - String[] splits = line.split(","); - LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]); - trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2])); - } - } catch (IOException e) { - LOG.error("Error reading input file : " + e); - } - } - - class KsSingleRunKey implements Serializable{ - - long startTime; - long endTime; - String metricName; - String appId; - String hostname; - - public KsSingleRunKey(long startTime, long endTime, String metricName, String appId, String hostname) { - this.startTime = startTime; - this.endTime = endTime; - this.metricName = metricName; - this.appId = appId; - this.hostname = hostname; - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java deleted file mode 100644 index 0640142..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.core; - -import java.io.Serializable; - -public class TrendMetric implements Serializable { - - String metricName; - String appId; - String hostname; - - public TrendMetric(String metricName, String appId, String hostname) { - this.metricName = metricName; - this.appId = appId; - this.hostname = hostname; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java deleted file mode 100644 index 0b10b4b..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.methods; - -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; - -import java.sql.Time; -import java.util.List; -import java.util.Map; - -public abstract class AnomalyDetectionTechnique { - - protected String methodType; - - public abstract List<MetricAnomaly> test(TimelineMetric metric); - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java deleted file mode 100644 index da4f030..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.methods; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -public class MetricAnomaly implements Serializable{ - - private String methodType; - private double anomalyScore; - private String metricKey; - private long timestamp; - private double metricValue; - - - public MetricAnomaly(String metricKey, long timestamp, double metricValue, String methodType, double anomalyScore) { - this.metricKey = metricKey; - this.timestamp = timestamp; - this.metricValue = metricValue; - this.methodType = methodType; - this.anomalyScore = anomalyScore; - - } - - public String getMethodType() { - return methodType; - } - - public void setMethodType(String methodType) { - this.methodType = methodType; - } - - public double getAnomalyScore() { - return anomalyScore; - } - - public void setAnomalyScore(double anomalyScore) { - this.anomalyScore = anomalyScore; - } - - public void setMetricKey(String metricKey) { - this.metricKey = metricKey; - } - - public String getMetricKey() { - return metricKey; - } - - public void setMetricName(String metricName) { - this.metricKey = metricName; - } - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - - public double getMetricValue() { - return metricValue; - } - - public void setMetricValue(double metricValue) { - this.metricValue = metricValue; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java deleted file mode 100644 index a31410d..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.methods.ema; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import javax.xml.bind.annotation.XmlRootElement; -import java.io.Serializable; - -import static org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique.suppressAnomaliesTheshold; - -@XmlRootElement -public class EmaModel implements Serializable { - - private String metricName; - private String hostname; - private String appId; - private double ema; - private double ems; - private double weight; - private double timessdev; - - private int ctr = 0; - - private static final Log LOG = LogFactory.getLog(EmaModel.class); - - public EmaModel(String name, String hostname, String appId, double weight, double timessdev) { - this.metricName = name; - this.hostname = hostname; - this.appId = appId; - this.weight = weight; - this.timessdev = timessdev; - this.ema = 0.0; - this.ems = 0.0; - } - - public String getMetricName() { - return metricName; - } - - public String getHostname() { - return hostname; - } - - public String getAppId() { - return appId; - } - - public double testAndUpdate(double metricValue) { - - double anomalyScore = 0.0; - LOG.info("Before Update ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + ", timessdev = " + timessdev); - update(metricValue); - if (ctr > suppressAnomaliesTheshold) { - anomalyScore = test(metricValue); - if (anomalyScore > 0.0) { - LOG.info("Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + - ", timessdev = " + timessdev + ", metricValue = " + metricValue); - } else { - LOG.info("Not an Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + - ", timessdev = " + timessdev + ", metricValue = " + metricValue); - } - } else { - ctr++; - if (ctr > suppressAnomaliesTheshold) { - LOG.info("Ema Model for " + metricName + ":" + appId + ":" + hostname + " is ready for testing data."); - } - } - return anomalyScore; - } - - public void update(double metricValue) { - ema = weight * ema + (1 - weight) * metricValue; - ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0)); - LOG.debug("In update : ema = " + ema + ", ems = " + ems); - } - - public double test(double metricValue) { - LOG.debug("In test : ema = " + ema + ", ems = " + ems); - double diff = Math.abs(ema - metricValue) - (timessdev * ems); - LOG.debug("diff = " + diff); - if (diff > 0) { - return Math.abs((metricValue - ema) / ems); //Z score - } else { - return 0.0; - } - } - - public void updateModel(boolean increaseSensitivity, double percent) { - LOG.info("Updating model for " + metricName + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent); - double delta = percent / 100; - if (increaseSensitivity) { - delta = delta * -1; - } - this.timessdev = timessdev + delta * timessdev; - //this.weight = Math.min(1.0, weight + delta * weight); - LOG.info("New model parameters " + metricName + " : timessdev = " + timessdev + ", weight = " + weight); - } - - public double getWeight() { - return weight; - } - - public void setWeight(double weight) { - this.weight = weight; - } - - public double getTimessdev() { - return timessdev; - } - - public void setTimessdev(double timessdev) { - this.timessdev = timessdev; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java deleted file mode 100644 index 62749c1..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.methods.ema; - -import com.google.gson.Gson; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.spark.SparkContext; -import org.apache.spark.mllib.util.Loader; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; - -public class EmaModelLoader implements Loader<EmaTechnique> { - private static final Log LOG = LogFactory.getLog(EmaModelLoader.class); - - @Override - public EmaTechnique load(SparkContext sc, String path) { - return new EmaTechnique(0.5,3); -// Gson gson = new Gson(); -// try { -// String fileString = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8); -// return gson.fromJson(fileString, EmaTechnique.class); -// } catch (IOException e) { -// LOG.error(e); -// } -// return null; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java deleted file mode 100644 index 52c6cf3..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.methods.ema; - -import com.google.gson.Gson; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; -import org.apache.ambari.metrics.alertservice.prototype.methods.AnomalyDetectionTechnique; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.spark.SparkContext; -import org.apache.spark.mllib.util.Saveable; - -import javax.xml.bind.annotation.XmlElement; -import javax.xml.bind.annotation.XmlRootElement; -import java.io.BufferedWriter; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Serializable; -import java.io.Writer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -@XmlRootElement -public class EmaTechnique extends AnomalyDetectionTechnique implements Serializable, Saveable { - - @XmlElement(name = "trackedEmas") - private Map<String, EmaModel> trackedEmas; - private static final Log LOG = LogFactory.getLog(EmaTechnique.class); - - private double startingWeight = 0.5; - private double startTimesSdev = 3.0; - private String methodType = "ema"; - public static int suppressAnomaliesTheshold = 100; - - public EmaTechnique(double startingWeight, double startTimesSdev, int suppressAnomaliesTheshold) { - trackedEmas = new HashMap<>(); - this.startingWeight = startingWeight; - this.startTimesSdev = startTimesSdev; - EmaTechnique.suppressAnomaliesTheshold = suppressAnomaliesTheshold; - LOG.info("New EmaTechnique......"); - } - - public EmaTechnique(double startingWeight, double startTimesSdev) { - trackedEmas = new HashMap<>(); - this.startingWeight = startingWeight; - this.startTimesSdev = startTimesSdev; - LOG.info("New EmaTechnique......"); - } - - public List<MetricAnomaly> test(TimelineMetric metric) { - String metricName = metric.getMetricName(); - String appId = metric.getAppId(); - String hostname = metric.getHostName(); - String key = metricName + ":" + appId + ":" + hostname; - - EmaModel emaModel = trackedEmas.get(key); - if (emaModel == null) { - LOG.debug("EmaModel not present for " + key); - LOG.debug("Number of tracked Emas : " + trackedEmas.size()); - emaModel = new EmaModel(metricName, hostname, appId, startingWeight, startTimesSdev); - trackedEmas.put(key, emaModel); - } else { - LOG.debug("EmaModel already present for " + key); - } - - List<MetricAnomaly> anomalies = new ArrayList<>(); - - for (Long timestamp : metric.getMetricValues().keySet()) { - double metricValue = metric.getMetricValues().get(timestamp); - double anomalyScore = emaModel.testAndUpdate(metricValue); - if (anomalyScore > 0.0) { - LOG.info("Found anomaly for : " + key + ", anomalyScore = " + anomalyScore); - MetricAnomaly metricAnomaly = new MetricAnomaly(key, timestamp, metricValue, methodType, anomalyScore); - anomalies.add(metricAnomaly); - } else { - LOG.debug("Discarding non-anomaly for : " + key); - } - } - return anomalies; - } - - public boolean updateModel(TimelineMetric timelineMetric, boolean increaseSensitivity, double percent) { - String metricName = timelineMetric.getMetricName(); - String appId = timelineMetric.getAppId(); - String hostname = timelineMetric.getHostName(); - String key = metricName + "_" + appId + "_" + hostname; - - - EmaModel emaModel = trackedEmas.get(key); - - if (emaModel == null) { - LOG.warn("EMA Model for " + key + " not found"); - return false; - } - emaModel.updateModel(increaseSensitivity, percent); - - return true; - } - - @Override - public void save(SparkContext sc, String path) { - Gson gson = new Gson(); - try { - String json = gson.toJson(this); - try (Writer writer = new BufferedWriter(new OutputStreamWriter( - new FileOutputStream(path), "utf-8"))) { - writer.write(json); - } - } catch (IOException e) { - LOG.error(e); - } - } - - @Override - public String formatVersion() { - return "1.0"; - } - - public Map<String, EmaModel> getTrackedEmas() { - return trackedEmas; - } - - public double getStartingWeight() { - return startingWeight; - } - - public double getStartTimesSdev() { - return startTimesSdev; - } - -} - http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java deleted file mode 100644 index 04f4a73..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype.methods.hsdev; - -import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; -import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import static org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.median; -import static org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.sdev; - -import java.io.Serializable; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; - -public class HsdevTechnique implements Serializable { - - private Map<String, Double> hsdevMap; - private String methodType = "hsdev"; - private static final Log LOG = LogFactory.getLog(HsdevTechnique.class); - - public HsdevTechnique() { - hsdevMap = new HashMap<>(); - } - - public MetricAnomaly runHsdevTest(String key, DataSeries trainData, DataSeries testData) { - int testLength = testData.values.length; - int trainLength = trainData.values.length; - - if (trainLength < testLength) { - LOG.info("Not enough train data."); - return null; - } - - if (!hsdevMap.containsKey(key)) { - hsdevMap.put(key, 3.0); - } - - double n = hsdevMap.get(key); - - double historicSd = sdev(trainData.values, false); - double historicMedian = median(trainData.values); - double currentMedian = median(testData.values); - - - if (historicSd > 0) { - double diff = Math.abs(currentMedian - historicMedian); - LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1])); - LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd); - - if (diff > n * historicSd) { - double zScore = diff / historicSd; - LOG.info("Z Score of current series : " + zScore); - return new MetricAnomaly(key, - (long) testData.ts[testLength - 1], - testData.values[testLength - 1], - methodType, - zScore); - } - } - - return null; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java deleted file mode 100644 index a9360d3..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.metrics.alertservice.prototype.methods.kstest; - -import org.apache.ambari.metrics.alertservice.prototype.core.RFunctionInvoker; -import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; -import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet; -import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.io.Serializable; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -public class KSTechnique implements Serializable { - - private String methodType = "ks"; - private Map<String, Double> pValueMap; - private static final Log LOG = LogFactory.getLog(KSTechnique.class); - - public KSTechnique() { - pValueMap = new HashMap(); - } - - public MetricAnomaly runKsTest(String key, DataSeries trainData, DataSeries testData) { - - int testLength = testData.values.length; - int trainLength = trainData.values.length; - - if (trainLength < testLength) { - LOG.info("Not enough train data."); - return null; - } - - if (!pValueMap.containsKey(key)) { - pValueMap.put(key, 0.05); - } - double pValue = pValueMap.get(key); - - ResultSet result = RFunctionInvoker.ksTest(trainData, testData, Collections.singletonMap("ks.p_value", String.valueOf(pValue))); - if (result == null) { - LOG.error("Resultset is null when invoking KS R function..."); - return null; - } - - if (result.resultset.size() > 0) { - - LOG.info("Is size 1 ? result size = " + result.resultset.get(0).length); - LOG.info("p_value = " + result.resultset.get(3)[0]); - double dValue = result.resultset.get(2)[0]; - - return new MetricAnomaly(key, - (long) testData.ts[testLength - 1], - testData.values[testLength - 1], - methodType, - dValue); - } - - return null; - } - - public void updateModel(String metricKey, boolean increaseSensitivity, double percent) { - - LOG.info("Updating KS model for " + metricKey + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent); - - if (!pValueMap.containsKey(metricKey)) { - LOG.error("Unknown metric key : " + metricKey); - LOG.info("pValueMap :" + pValueMap.toString()); - return; - } - - double delta = percent / 100; - if (!increaseSensitivity) { - delta = delta * -1; - } - - double pValue = pValueMap.get(metricKey); - double newPValue = Math.min(1.0, pValue + delta * pValue); - pValueMap.put(metricKey, newPValue); - LOG.info("New pValue = " + newPValue); - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java deleted file mode 100644 index 268cd15..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.metrics.alertservice.prototype.testing.utilities; - -import javax.xml.bind.annotation.XmlRootElement; -import java.util.List; -import java.util.Map; - -@XmlRootElement -public class MetricAnomalyDetectorTestInput { - - public MetricAnomalyDetectorTestInput() { - } - - //Train data - private String trainDataName; - private String trainDataType; - private Map<String, String> trainDataConfigs; - private int trainDataSize; - - //Test data - private String testDataName; - private String testDataType; - private Map<String, String> testDataConfigs; - private int testDataSize; - - //Algorithm data - private List<String> methods; - private Map<String, String> methodConfigs; - - public String getTrainDataName() { - return trainDataName; - } - - public void setTrainDataName(String trainDataName) { - this.trainDataName = trainDataName; - } - - public String getTrainDataType() { - return trainDataType; - } - - public void setTrainDataType(String trainDataType) { - this.trainDataType = trainDataType; - } - - public Map<String, String> getTrainDataConfigs() { - return trainDataConfigs; - } - - public void setTrainDataConfigs(Map<String, String> trainDataConfigs) { - this.trainDataConfigs = trainDataConfigs; - } - - public String getTestDataName() { - return testDataName; - } - - public void setTestDataName(String testDataName) { - this.testDataName = testDataName; - } - - public String getTestDataType() { - return testDataType; - } - - public void setTestDataType(String testDataType) { - this.testDataType = testDataType; - } - - public Map<String, String> getTestDataConfigs() { - return testDataConfigs; - } - - public void setTestDataConfigs(Map<String, String> testDataConfigs) { - this.testDataConfigs = testDataConfigs; - } - - public Map<String, String> getMethodConfigs() { - return methodConfigs; - } - - public void setMethodConfigs(Map<String, String> methodConfigs) { - this.methodConfigs = methodConfigs; - } - - public int getTrainDataSize() { - return trainDataSize; - } - - public void setTrainDataSize(int trainDataSize) { - this.trainDataSize = trainDataSize; - } - - public int getTestDataSize() { - return testDataSize; - } - - public void setTestDataSize(int testDataSize) { - this.testDataSize = testDataSize; - } - - public List<String> getMethods() { - return methods; - } - - public void setMethods(List<String> methods) { - this.methods = methods; - } -}
