This is an automated email from the ASF dual-hosted git repository. avijayan pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
commit 2fdf774dd655ef950e52dcc871b080fc00e65555 Author: Aravindan Vijayan <[email protected]> AuthorDate: Tue Sep 26 14:38:40 2017 -0700 AMBARI-21106 : ML-Prototype: Detect timeseries anomaly for a metric. (Refine PIT & Trend subsystems, Integrate with AMS, Ambari Alerts.) --- .../prototype/AmbariServerInterface.java | 1 - .../prototype/MetricSparkConsumer.java | 113 ++++++++++--- .../prototype/MetricsCollectorInterface.java | 10 +- .../prototype/PointInTimeADSystem.java | 18 +- .../alertservice/prototype/TrendADSystem.java | 26 +-- .../prototype/methods/ema/EmaModel.java | 31 ++-- .../prototype/methods/ema/EmaTechnique.java | 21 ++- .../prototype/methods/hsdev/HsdevTechnique.java | 26 +-- .../src/main/resources/R-scripts/tukeys.r | 17 +- .../src/main/resources/input-config.properties | 24 +++ .../alertservice/prototype/TestEmaTechnique.java | 22 ++- .../metrics/alertservice/prototype/TestTukeys.java | 1 - .../ambari-metrics-grafana/src/main/scripted.js | 118 +++++++++++++ .../metrics/TestMetricSeriesGenerator.java | 87 ++++++++++ .../timeline/HBaseTimelineMetricsService.java | 18 +- .../metrics/timeline/PhoenixHBaseAccessor.java | 122 +++++++++++++- .../timeline/TimelineMetricConfiguration.java | 11 +- .../metrics/timeline/TimelineMetricStore.java | 2 + .../metrics/timeline/query/PhoenixTransactSQL.java | 94 +++++++++++ .../webapp/MetricAnomalyDetectorTestService.java | 87 ++++++++++ .../webapp/TimelineWebServices.java | 36 +++- .../metrics/timeline/TestTimelineMetricStore.java | 5 + .../AMBARI_METRICS/0.1.0/alerts.json | 70 ++++++++ .../alerts/alert_point_in_time_metric_anomalies.py | 185 +++++++++++++++++++++ .../package/alerts/alert_trend_metric_anomalies.py | 185 +++++++++++++++++++++ .../package/alerts/alert_metrics_deviation.py | 4 +- .../metrics/timeline/MetricsPaddingMethodTest.java | 7 + 27 files changed, 1234 insertions(+), 107 deletions(-) diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java index 0c1c6fc..b98f04c 100644 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java @@ -76,7 +76,6 @@ public class AmbariServerInterface implements Serializable{ JSONArray array = jsonObject.getJSONArray("items"); for(int i = 0 ; i < array.length() ; i++){ JSONObject alertDefn = array.getJSONObject(i).getJSONObject("AlertDefinition"); - LOG.info("alertDefn : " + alertDefn.get("name")); if (alertDefn.get("name") != null && alertDefn.get("name").equals("point_in_time_metrics_anomalies")) { JSONObject sourceNode = alertDefn.getJSONObject("source"); JSONArray params = sourceNode.getJSONArray("parameters"); diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java index 7735d6c..61b3dee 100644 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java @@ -37,6 +37,12 @@ import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; import java.util.*; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class MetricSparkConsumer { @@ -47,38 +53,75 @@ public class MetricSparkConsumer { private static long pitStartTime = System.currentTimeMillis(); private static long ksStartTime = pitStartTime; private static long hdevStartTime = ksStartTime; + private static Set<Pattern> includeMetricPatterns = new HashSet<>(); + private static Set<String> includedHosts = new HashSet<>(); + private static Set<TrendMetric> trendMetrics = new HashSet<>(); public MetricSparkConsumer() { } + public static Properties readProperties(String propertiesFile) { + try { + Properties properties = new Properties(); + InputStream inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile); + if (inputStream == null) { + inputStream = new FileInputStream(propertiesFile); + } + properties.load(inputStream); + return properties; + } catch (IOException ioEx) { + LOG.error("Error reading properties file for jmeter"); + return null; + } + } + public static void main(String[] args) throws InterruptedException { - if (args.length < 5) { - System.err.println("Usage: MetricSparkConsumer <appid1,appid2> <collector_host> <port> <protocol> <zkQuorum>"); + if (args.length < 1) { + System.err.println("Usage: MetricSparkConsumer <input-config-file>"); System.exit(1); } - List<String> appIds = Arrays.asList(args[0].split(",")); - String collectorHost = args[1]; - String collectorPort = args[2]; - String collectorProtocol = args[3]; - String zkQuorum = args[4]; + Properties properties = readProperties(args[0]); + + List<String> appIds = Arrays.asList(properties.getProperty("appIds").split(",")); + + String collectorHost = properties.getProperty("collectorHost"); + String collectorPort = properties.getProperty("collectorPort"); + String collectorProtocol = properties.getProperty("collectorProtocol"); - double emaW = StringUtils.isNotEmpty(args[5]) ? Double.parseDouble(args[5]) : 0.5; - double emaN = StringUtils.isNotEmpty(args[8]) ? Double.parseDouble(args[6]) : 3; - double tukeysN = StringUtils.isNotEmpty(args[7]) ? Double.parseDouble(args[7]) : 3; + String zkQuorum = properties.getProperty("zkQuorum"); - long pitTestInterval = StringUtils.isNotEmpty(args[8]) ? Long.parseLong(args[8]) : 5 * 60 * 1000; - long pitTrainInterval = StringUtils.isNotEmpty(args[9]) ? Long.parseLong(args[9]) : 15 * 60 * 1000; + double emaW = Double.parseDouble(properties.getProperty("emaW")); + double emaN = Double.parseDouble(properties.getProperty("emaN")); + int emaThreshold = Integer.parseInt(properties.getProperty("emaThreshold")); + double tukeysN = Double.parseDouble(properties.getProperty("tukeysN")); - String fileName = args[10]; - long ksTestInterval = StringUtils.isNotEmpty(args[11]) ? Long.parseLong(args[11]) : 10 * 60 * 1000; - long ksTrainInterval = StringUtils.isNotEmpty(args[12]) ? Long.parseLong(args[12]) : 10 * 60 * 1000; - int hsdevNhp = StringUtils.isNotEmpty(args[13]) ? Integer.parseInt(args[13]) : 3; - long hsdevInterval = StringUtils.isNotEmpty(args[14]) ? Long.parseLong(args[14]) : 30 * 60 * 1000; + long pitTestInterval = Long.parseLong(properties.getProperty("pointInTimeTestInterval")); + long pitTrainInterval = Long.parseLong(properties.getProperty("pointInTimeTrainInterval")); - String ambariServerHost = args[15]; - String clusterName = args[16]; + long ksTestInterval = Long.parseLong(properties.getProperty("ksTestInterval")); + long ksTrainInterval = Long.parseLong(properties.getProperty("ksTrainInterval")); + int hsdevNhp = Integer.parseInt(properties.getProperty("hsdevNhp")); + long hsdevInterval = Long.parseLong(properties.getProperty("hsdevInterval")); + + String ambariServerHost = properties.getProperty("ambariServerHost"); + String clusterName = properties.getProperty("clusterName"); + + String includeMetricPatternStrings = properties.getProperty("includeMetricPatterns"); + if (includeMetricPatternStrings != null && !includeMetricPatternStrings.isEmpty()) { + String[] patterns = includeMetricPatternStrings.split(","); + for (String p : patterns) { + LOG.info("Included Pattern : " + p); + includeMetricPatterns.add(Pattern.compile(p)); + } + } + + String includedHostList = properties.getProperty("hosts"); + if (includedHostList != null && !includedHostList.isEmpty()) { + String[] hosts = includedHostList.split(","); + includedHosts.addAll(Arrays.asList(hosts)); + } MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort); @@ -86,7 +129,7 @@ public class MetricSparkConsumer { JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); - EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN); + EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN, emaThreshold); PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(metricsCollectorInterface, tukeysN, pitTestInterval, @@ -97,13 +140,14 @@ public class MetricSparkConsumer { TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface, ksTestInterval, ksTrainInterval, - hsdevNhp, - fileName); + hsdevNhp); Broadcast<EmaTechnique> emaTechniqueBroadcast = jssc.sparkContext().broadcast(emaTechnique); Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = jssc.sparkContext().broadcast(pointInTimeADSystem); Broadcast<TrendADSystem> trendADSystemBroadcast = jssc.sparkContext().broadcast(trendADSystem); Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = jssc.sparkContext().broadcast(metricsCollectorInterface); + Broadcast<Set<Pattern>> includePatternBroadcast = jssc.sparkContext().broadcast(includeMetricPatterns); + Broadcast<Set<String>> includedHostBroadcast = jssc.sparkContext().broadcast(includedHosts); JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads)); @@ -150,7 +194,7 @@ public class MetricSparkConsumer { if (currentTime > ksStartTime + ksTestInterval) { LOG.info("Running KS Test...."); - trendADSystemBroadcast.getValue().runKSTest(currentTime); + trendADSystemBroadcast.getValue().runKSTest(currentTime, trendMetrics); ksStartTime = ksStartTime + ksTestInterval; } @@ -162,8 +206,27 @@ public class MetricSparkConsumer { TimelineMetrics metrics = tuple2._2(); for (TimelineMetric timelineMetric : metrics.getMetrics()) { - List<MetricAnomaly> anomalies = ema.test(timelineMetric); - metricsCollectorInterfaceBroadcast.getValue().publish(anomalies); + + boolean includeHost = includedHostBroadcast.getValue().contains(timelineMetric.getHostName()); + boolean includeMetric = false; + if (includeHost) { + if (includePatternBroadcast.getValue().isEmpty()) { + includeMetric = true; + } + for (Pattern p : includePatternBroadcast.getValue()) { + Matcher m = p.matcher(timelineMetric.getMetricName()); + if (m.find()) { + includeMetric = true; + } + } + } + + if (includeMetric) { + trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), + timelineMetric.getHostName())); + List<MetricAnomaly> anomalies = ema.test(timelineMetric); + metricsCollectorInterfaceBroadcast.getValue().publish(anomalies); + } } }); }); diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java index 7b3f63d..dab4a0a 100644 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java @@ -96,7 +96,7 @@ public class MetricsCollectorInterface implements Serializable { emitMetrics(timelineMetrics); } } else { - LOG.info("No anomalies to send."); + LOG.debug("No anomalies to send."); } } @@ -130,7 +130,7 @@ public class MetricsCollectorInterface implements Serializable { public boolean emitMetrics(TimelineMetrics metrics) { String connectUrl = constructTimelineMetricUri(); String jsonData = null; - LOG.info("EmitMetrics connectUrl = " + connectUrl); + LOG.debug("EmitMetrics connectUrl = " + connectUrl); try { jsonData = mapper.writeValueAsString(metrics); LOG.info(jsonData); @@ -202,7 +202,7 @@ public class MetricsCollectorInterface implements Serializable { String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId + "&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime; - LOG.info("Fetch metrics URL : " + url); + LOG.debug("Fetch metrics URL : " + url); URL obj = null; BufferedReader in = null; @@ -213,8 +213,8 @@ public class MetricsCollectorInterface implements Serializable { HttpURLConnection con = (HttpURLConnection) obj.openConnection(); con.setRequestMethod("GET"); int responseCode = con.getResponseCode(); - LOG.info("Sending 'GET' request to URL : " + url); - LOG.info("Response Code : " + responseCode); + LOG.debug("Sending 'GET' request to URL : " + url); + LOG.debug("Response Code : " + responseCode); in = new BufferedReader( new InputStreamReader(con.getInputStream())); diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java index b4a8593..b3e7bd3 100644 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java @@ -49,7 +49,7 @@ public class PointInTimeADSystem implements Serializable { private AmbariServerInterface ambariServerInterface; private int sensitivity = 50; private int minSensitivity = 0; - private int maxSensitivity = 10; + private int maxSensitivity = 100; public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN, long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) { @@ -73,13 +73,13 @@ public class PointInTimeADSystem implements Serializable { if (requiredSensivity > sensitivity) { int targetSensitivity = Math.min(maxSensitivity, requiredSensivity); while (sensitivity < targetSensitivity) { - defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.1; + defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.05; sensitivity++; } } else { int targetSensitivity = Math.max(minSensitivity, requiredSensivity); while (sensitivity > targetSensitivity) { - defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.1; + defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.05; sensitivity--; } } @@ -201,10 +201,10 @@ public class PointInTimeADSystem implements Serializable { if (recall < 0.5) { LOG.info("Increasing EMA sensitivity by 10%"); - emaModel.updateModel(true, 10); + emaModel.updateModel(true, 5); } else if (precision < 0.5) { LOG.info("Decreasing EMA sensitivity by 10%"); - emaModel.updateModel(false, 10); + emaModel.updateModel(false, 5); } } @@ -233,7 +233,7 @@ public class PointInTimeADSystem implements Serializable { double[] anomalyScore = result.resultset.get(2); for (int i = 0; i < ts.length; i++) { TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(metricName + "_" + appId + "_" + hostname); + timelineMetric.setMetricName(metricName + ":" + appId + ":" + hostname); timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName()); timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys"); timelineMetric.setInstanceId(null); @@ -243,7 +243,11 @@ public class PointInTimeADSystem implements Serializable { HashMap<String, String> metadata = new HashMap<>(); metadata.put("method", "tukeys"); - metadata.put("anomaly-score", String.valueOf(anomalyScore[i])); + if (String.valueOf(anomalyScore[i]).equals("infinity")) { + LOG.info("Got anomalyScore = infinity for " + metricName + ":" + appId + ":" + hostname); + } else { + metadata.put("anomaly-score", String.valueOf(anomalyScore[i])); + } timelineMetric.setMetadata(metadata); timelineMetric.setMetricValues(metricValues); diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java index 1534b55..df36a4a 100644 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java @@ -31,11 +31,11 @@ import java.io.FileReader; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; public class TrendADSystem implements Serializable { @@ -57,8 +57,7 @@ public class TrendADSystem implements Serializable { public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface, long ksTestIntervalMillis, long ksTrainIntervalMillis, - int hsdevNumHistoricalPeriods, - String inputFileName) { + int hsdevNumHistoricalPeriods) { this.metricsCollectorInterface = metricsCollectorInterface; this.ksTestIntervalMillis = ksTestIntervalMillis; @@ -69,11 +68,9 @@ public class TrendADSystem implements Serializable { this.hsdevTechnique = new HsdevTechnique(); trendMetrics = new ArrayList<>(); - this.inputFile = inputFileName; - readInputFile(inputFileName); } - public void runKSTest(long currentEndTime) { + public void runKSTest(long currentEndTime, Set<TrendMetric> trendMetrics) { readInputFile(inputFile); long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis; @@ -85,7 +82,7 @@ public class TrendADSystem implements Serializable { String metricName = metric.metricName; String appId = metric.appId; String hostname = metric.hostname; - String key = metricName + "_" + appId + "_" + hostname; + String key = metricName + ":" + appId + ":" + hostname; TimelineMetrics ksData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, ksTestIntervalStartTime - ksTrainIntervalMillis, currentEndTime); @@ -112,6 +109,7 @@ public class TrendADSystem implements Serializable { } } + LOG.info("Train Data size : " + trainDataList.size() + ", Test Data Size : " + testDataList.size()); if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { LOG.info("Not enough train/test data to perform KS analysis."); continue; @@ -184,6 +182,7 @@ public class TrendADSystem implements Serializable { return timelineMetric; } + public void runHsdevMethod() { List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>(); @@ -315,17 +314,4 @@ public class TrendADSystem implements Serializable { this.hostname = hostname; } } - - /* - boolean isPresent = false; - for (TrendMetric trendMetric : trendMetrics) { - if (trendMetric.metricName.equalsIgnoreCase(splits[0])) { - isPresent = true; - } - } - if (!isPresent) { - LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]); - trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2])); - } - */ } diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java index 5e1f76b..a31410d 100644 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java @@ -23,6 +23,8 @@ import org.apache.commons.logging.LogFactory; import javax.xml.bind.annotation.XmlRootElement; import java.io.Serializable; +import static org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique.suppressAnomaliesTheshold; + @XmlRootElement public class EmaModel implements Serializable { @@ -35,7 +37,6 @@ public class EmaModel implements Serializable { private double timessdev; private int ctr = 0; - private static final int suppressAnomaliesTheshold = 30; private static final Log LOG = LogFactory.getLog(EmaModel.class); @@ -64,30 +65,36 @@ public class EmaModel implements Serializable { public double testAndUpdate(double metricValue) { double anomalyScore = 0.0; + LOG.info("Before Update ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + ", timessdev = " + timessdev); + update(metricValue); if (ctr > suppressAnomaliesTheshold) { anomalyScore = test(metricValue); - } - if (Math.abs(anomalyScore) < 2 * timessdev) { - update(metricValue); + if (anomalyScore > 0.0) { + LOG.info("Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + + ", timessdev = " + timessdev + ", metricValue = " + metricValue); + } else { + LOG.info("Not an Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + + ", timessdev = " + timessdev + ", metricValue = " + metricValue); + } } else { - LOG.info("Not updating model for this value"); + ctr++; + if (ctr > suppressAnomaliesTheshold) { + LOG.info("Ema Model for " + metricName + ":" + appId + ":" + hostname + " is ready for testing data."); + } } - ctr++; - LOG.info("Counter : " + ctr); - LOG.info("Anomaly Score for " + metricValue + " : " + anomalyScore); return anomalyScore; } public void update(double metricValue) { ema = weight * ema + (1 - weight) * metricValue; ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0)); - LOG.info("In update : ema = " + ema + ", ems = " + ems); + LOG.debug("In update : ema = " + ema + ", ems = " + ems); } public double test(double metricValue) { - LOG.info("In test : ema = " + ema + ", ems = " + ems); + LOG.debug("In test : ema = " + ema + ", ems = " + ems); double diff = Math.abs(ema - metricValue) - (timessdev * ems); - LOG.info("diff = " + diff); + LOG.debug("diff = " + diff); if (diff > 0) { return Math.abs((metricValue - ema) / ems); //Z score } else { @@ -102,7 +109,7 @@ public class EmaModel implements Serializable { delta = delta * -1; } this.timessdev = timessdev + delta * timessdev; - this.weight = Math.min(1.0, weight + delta * weight); + //this.weight = Math.min(1.0, weight + delta * weight); LOG.info("New model parameters " + metricName + " : timessdev = " + timessdev + ", weight = " + weight); } diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java index c005e6f..52c6cf3 100644 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java @@ -49,6 +49,15 @@ public class EmaTechnique extends AnomalyDetectionTechnique implements Serializa private double startingWeight = 0.5; private double startTimesSdev = 3.0; private String methodType = "ema"; + public static int suppressAnomaliesTheshold = 100; + + public EmaTechnique(double startingWeight, double startTimesSdev, int suppressAnomaliesTheshold) { + trackedEmas = new HashMap<>(); + this.startingWeight = startingWeight; + this.startTimesSdev = startTimesSdev; + EmaTechnique.suppressAnomaliesTheshold = suppressAnomaliesTheshold; + LOG.info("New EmaTechnique......"); + } public EmaTechnique(double startingWeight, double startTimesSdev) { trackedEmas = new HashMap<>(); @@ -61,16 +70,16 @@ public class EmaTechnique extends AnomalyDetectionTechnique implements Serializa String metricName = metric.getMetricName(); String appId = metric.getAppId(); String hostname = metric.getHostName(); - String key = metricName + "_" + appId + "_" + hostname; + String key = metricName + ":" + appId + ":" + hostname; EmaModel emaModel = trackedEmas.get(key); if (emaModel == null) { - LOG.info("EmaModel not present for " + key); - LOG.info("Number of tracked Emas : " + trackedEmas.size()); + LOG.debug("EmaModel not present for " + key); + LOG.debug("Number of tracked Emas : " + trackedEmas.size()); emaModel = new EmaModel(metricName, hostname, appId, startingWeight, startTimesSdev); trackedEmas.put(key, emaModel); } else { - LOG.info("EmaModel already present for " + key); + LOG.debug("EmaModel already present for " + key); } List<MetricAnomaly> anomalies = new ArrayList<>(); @@ -79,11 +88,11 @@ public class EmaTechnique extends AnomalyDetectionTechnique implements Serializa double metricValue = metric.getMetricValues().get(timestamp); double anomalyScore = emaModel.testAndUpdate(metricValue); if (anomalyScore > 0.0) { - LOG.info("Found anomaly for : " + key); + LOG.info("Found anomaly for : " + key + ", anomalyScore = " + anomalyScore); MetricAnomaly metricAnomaly = new MetricAnomaly(key, timestamp, metricValue, methodType, anomalyScore); anomalies.add(metricAnomaly); } else { - LOG.info("Discarding non-anomaly for : " + key); + LOG.debug("Discarding non-anomaly for : " + key); } } return anomalies; diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java index 50bf9f2..04f4a73 100644 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java @@ -58,19 +58,23 @@ public class HsdevTechnique implements Serializable { double historicMedian = median(trainData.values); double currentMedian = median(testData.values); - double diff = Math.abs(currentMedian - historicMedian); - LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1])); - LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd); - if (diff > n * historicSd) { - double zScore = diff / historicSd; - LOG.info("Z Score of current series : " + zScore); - return new MetricAnomaly(key, - (long) testData.ts[testLength - 1], - testData.values[testLength - 1], - methodType, - zScore); + if (historicSd > 0) { + double diff = Math.abs(currentMedian - historicMedian); + LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1])); + LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd); + + if (diff > n * historicSd) { + double zScore = diff / historicSd; + LOG.info("Z Score of current series : " + zScore); + return new MetricAnomaly(key, + (long) testData.ts[testLength - 1], + testData.values[testLength - 1], + methodType, + zScore); + } } + return null; } diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r index f33b6ec..0312226 100644 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r @@ -26,20 +26,23 @@ ams_tukeys <- function(train_data, test_data, n) { anomalies <- data.frame() quantiles <- quantile(train_data[,2]) iqr <- quantiles[4] - quantiles[2] + niqr <- 0 for ( i in 1:length(test_data[,1])) { x <- test_data[i,2] lb <- quantiles[2] - n*iqr ub <- quantiles[4] + n*iqr if ( (x < lb) || (x > ub) ) { - if (x < lb) { - niqr <- (quantiles[2] - x) / iqr - } else { - niqr <- (x - quantiles[4]) / iqr + if (iqr != 0) { + if (x < lb) { + niqr <- (quantiles[2] - x) / iqr + } else { + niqr <- (x - quantiles[4]) / iqr + } + } + anomaly <- c(test_data[i,1], x, niqr) + anomalies <- rbind(anomalies, anomaly) } - anomaly <- c(test_data[i,1], x, niqr) - anomalies <- rbind(anomalies, anomaly) - } } if(length(anomalies) > 0) { names(anomalies) <- c("TS", "Value", "niqr") diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/input-config.properties b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/input-config.properties new file mode 100644 index 0000000..88304c7 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/input-config.properties @@ -0,0 +1,24 @@ +appIds=HOST + +collectorHost=localhost +collectorPort=6188 +collectorProtocol=http + +zkQuorum=localhost:2181 + +ambariServerHost=localhost +clusterName=c1 + +emaW=0.8 +emaN=3 +tukeysN=3 +pointInTimeTestInterval=300000 +pointInTimeTrainInterval=900000 + +ksTestInterval=600000 +ksTrainInterval=600000 +hsdevNhp=3 +hsdevInterval=1800000; + +skipMetricPatterns=sdisk*,cpu_sintr*,proc*,disk*,boottime +hosts=avijayan-ad-1.openstacklocal \ No newline at end of file diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java index 539ca40..d1e2b41 100644 --- a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java +++ b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java @@ -21,21 +21,41 @@ import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; import java.util.List; import java.util.TreeMap; +import static org.apache.ambari.metrics.alertservice.prototype.TestRFunctionInvoker.getTS; + public class TestEmaTechnique { + private static double[] ts; + private static String fullFilePath; + + @BeforeClass + public static void init() throws URISyntaxException { + + Assume.assumeTrue(System.getenv("R_HOME") != null); + ts = getTS(1000); + URL url = ClassLoader.getSystemResource("R-scripts"); + fullFilePath = new File(url.toURI()).getAbsolutePath(); + RFunctionInvoker.setScriptsDir(fullFilePath); + } + @Test public void testEmaInitialization() { EmaTechnique ema = new EmaTechnique(0.5, 3); Assert.assertTrue(ema.getTrackedEmas().isEmpty()); Assert.assertTrue(ema.getStartingWeight() == 0.5); - Assert.assertTrue(ema.getStartTimesSdev() == 3); + Assert.assertTrue(ema.getStartTimesSdev() == 2); } @Test diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java index bb409cf..ef0125f 100644 --- a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java +++ b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java @@ -21,7 +21,6 @@ import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.junit.Assert; import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Test; diff --git a/ambari-metrics/ambari-metrics-grafana/src/main/scripted.js b/ambari-metrics/ambari-metrics-grafana/src/main/scripted.js new file mode 100644 index 0000000..298535f --- /dev/null +++ b/ambari-metrics/ambari-metrics-grafana/src/main/scripted.js @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* global _ */ + +/* + * Complex scripted dashboard + * This script generates a dashboard object that Grafana can load. It also takes a number of user + * supplied URL parameters (in the ARGS variable) + * + * Return a dashboard object, or a function + * + * For async scripts, return a function, this function must take a single callback function as argument, + * call this callback function with the dashboard object (look at scripted_async.js for an example) + */ + +'use strict'; + +// accessible variables in this scope +var window, document, ARGS, $, jQuery, moment, kbn; + +// Setup some variables +var dashboard; + +// All url parameters are available via the ARGS object +var ARGS; + +// Intialize a skeleton with nothing but a rows array and service object +dashboard = { + rows : [], +}; + +// Set a title +dashboard.title = 'Scripted dash'; + +// Set default time +// time can be overriden in the url using from/to parameters, but this is +// handled automatically in grafana core during dashboard initialization + + +var obj = JSON.parse(ARGS.anomalies); +var metrics = obj.metrics; +var rows = metrics.length + +dashboard.time = { + from: "now-1h", + to: "now" +}; + +var metricSet = new Set(); + +for (var i = 0; i < rows; i++) { + + var key = metrics[i].metricname; + if (metricSet.has(key)) { + continue; + } + metricSet.add(key) + var metricKeyElements = key.split(":"); + var metricName = metricKeyElements[0]; + var appId = metricKeyElements[1]; + var hostname = metricKeyElements[2]; + + dashboard.rows.push({ + title: 'Chart', + height: '300px', + panels: [ + { + title: metricName, + type: 'graph', + span: 12, + fill: 1, + linewidth: 2, + targets: [ + { + "aggregator": "none", + "alias": metricName, + "app": appId, + "errors": {}, + "metric": metricName, + "precision": "default", + "refId": "A", + "hosts": hostname + } + ], + seriesOverrides: [ + { + alias: '/random/', + yaxis: 2, + fill: 0, + linewidth: 5 + } + ], + tooltip: { + shared: true + } + } + ] + }); +} + + +return dashboard; diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java new file mode 100644 index 0000000..2420ef3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java @@ -0,0 +1,87 @@ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics; + +import org.apache.ambari.metrics.alertservice.prototype.TestSeriesInputRequest; +import org.apache.ambari.metrics.alertservice.seriesgenerator.AbstractMetricSeries; +import org.apache.ambari.metrics.alertservice.seriesgenerator.MetricSeriesGeneratorFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +public class TestMetricSeriesGenerator implements Runnable { + + private Map<TestSeriesInputRequest, AbstractMetricSeries> configuredSeries = new HashMap<>(); + private static final Log LOG = LogFactory.getLog(TestMetricSeriesGenerator.class); + private TimelineMetricStore metricStore; + private String hostname; + + public TestMetricSeriesGenerator(TimelineMetricStore metricStore) { + this.metricStore = metricStore; + try { + this.hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + } + + public void addSeries(TestSeriesInputRequest inputRequest) { + if (!configuredSeries.containsKey(inputRequest)) { + AbstractMetricSeries metricSeries = MetricSeriesGeneratorFactory.generateSeries(inputRequest.getSeriesType(), inputRequest.getConfigs()); + configuredSeries.put(inputRequest, metricSeries); + LOG.info("Added series " + inputRequest.getSeriesName()); + } + } + + public void removeSeries(String seriesName) { + boolean isPresent = false; + TestSeriesInputRequest tbd = null; + for (TestSeriesInputRequest inputRequest : configuredSeries.keySet()) { + if (inputRequest.getSeriesName().equals(seriesName)) { + isPresent = true; + tbd = inputRequest; + } + } + if (isPresent) { + LOG.info("Removing series " + seriesName); + configuredSeries.remove(tbd); + } else { + LOG.info("Series not found : " + seriesName); + } + } + + @Override + public void run() { + long currentTime = System.currentTimeMillis(); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + + for (TestSeriesInputRequest input : configuredSeries.keySet()) { + AbstractMetricSeries metricSeries = configuredSeries.get(input); + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(input.getSeriesName()); + timelineMetric.setAppId("anomaly-engine-test-metric"); + timelineMetric.setInstanceId(null); + timelineMetric.setStartTime(currentTime); + timelineMetric.setHostName(hostname); + TreeMap<Long, Double> metricValues = new TreeMap(); + metricValues.put(currentTime, metricSeries.nextValue()); + timelineMetric.setMetricValues(metricValues); + timelineMetrics.addOrMergeTimelineMetric(timelineMetric); + LOG.info("Emitting metric with appId = " + timelineMetric.getAppId()); + } + try { + LOG.info("Publishing test metrics for " + timelineMetrics.getMetrics().size() + " series."); + metricStore.putMetrics(timelineMetrics); + } catch (Exception e) { + LOG.error(e); + } + } +} diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java index 95682f9..4450d65 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java @@ -157,6 +157,10 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time "start cache node", e); } } +// String kafkaServers = configuration.getKafkaServers(); +// if (kafkaServers != null) { +// metricKafkaProducer = new MetricKafkaProducer(kafkaServers); +// } defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20")); if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) { @@ -235,6 +239,11 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time } @Override + public TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) throws SQLException { + return hBaseAccessor.getAnomalyMetricRecords(method, startTime, endTime, limit); + } + + @Override public TimelineMetrics getTimelineMetrics(List<String> metricNames, List<String> hostnames, String applicationId, String instanceId, Long startTime, Long endTime, Precision precision, Integer limit, @@ -403,10 +412,17 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time cache.putMetrics(metrics.getMetrics(), metricMetadataManager); } +// try { +// metricKafkaProducer.sendMetrics(metrics); +//// if (metrics.getMetrics().size() != 0 && metrics.getMetrics().get(0).getAppId().equals("anomaly-engine-test-metric")) { +//// } +// } catch (Exception e) { +// LOG.error(e); +// } + return response; } - @Override public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics) throws SQLException, IOException { diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index da14fd1..f470c58 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATE_TABLE_SPLIT_POINTS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL; @@ -35,7 +34,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_SPLIT_POINTS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES; @@ -50,15 +48,18 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ANOMALY_METRICS_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_INSTANCE_HOST_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_ANOMALY_METRICS_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_TREND_ANOMALY_METRICS_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_HOSTED_APPS_METADATA_SQL; @@ -73,7 +74,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.TREND_ANOMALY_METRICS_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_ANOMALY_METRICS_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL; @@ -81,6 +84,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_INSTANCE_HOST_METADATA_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_TREND_ANOMALY_METRICS_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS; import java.io.IOException; @@ -309,11 +313,63 @@ public class PhoenixHBaseAccessor { commitMetrics(Collections.singletonList(timelineMetrics)); } + private void commitAnomalyMetric(Connection conn, TimelineMetric metric) { + PreparedStatement metricRecordStmt = null; + try { + + Map<String, String> metricMetadata = metric.getMetadata(); + + + byte[] uuid = metadataManagerInstance.getUuid(metric); + if (uuid == null) { + LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString()); + return; + } + + if (metric.getAppId().equals("anomaly-engine-ks") || metric.getAppId().equals("anomaly-engine-hsdev")) { + metricRecordStmt = conn.prepareStatement(String.format(UPSERT_TREND_ANOMALY_METRICS_SQL, + TREND_ANOMALY_METRICS_TABLE_NAME)); + + metricRecordStmt.setBytes(1, uuid); + metricRecordStmt.setLong(2, metric.getStartTime()); + metricRecordStmt.setLong(3, Long.parseLong(metricMetadata.get("test-start-time"))); + metricRecordStmt.setLong(4, Long.parseLong(metricMetadata.get("train-start-time"))); + metricRecordStmt.setLong(5, Long.parseLong(metricMetadata.get("train-end-time"))); + String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues()); + metricRecordStmt.setString(6, json); + metricRecordStmt.setString(7, metric.getMetadata().get("method")); + double anomalyScore = metric.getMetadata().containsKey("anomaly-score") ? Double.parseDouble(metric.getMetadata().get("anomaly-score")) : 0.0; + metricRecordStmt.setDouble(8, anomalyScore); + + } else { + metricRecordStmt = conn.prepareStatement(String.format( + UPSERT_ANOMALY_METRICS_SQL, ANOMALY_METRICS_TABLE_NAME)); + + metricRecordStmt.setBytes(1, uuid); + metricRecordStmt.setLong(2, metric.getStartTime()); + String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues()); + metricRecordStmt.setString(3, json); + metricRecordStmt.setString(4, metric.getMetadata().get("method")); + double anomalyScore = metric.getMetadata().containsKey("anomaly-score") ? Double.parseDouble(metric.getMetadata().get("anomaly-score")) : 0.0; + metricRecordStmt.setDouble(5, anomalyScore); + } + + try { + metricRecordStmt.executeUpdate(); + } catch (SQLException sql) { + LOG.error("Failed on insert records to store.", sql); + } + + } catch (Exception e) { + LOG.error("Failed on insert records to anomaly table.", e); + } + + } + public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection) { LOG.debug("Committing metrics to store"); Connection conn = null; PreparedStatement metricRecordStmt = null; - long currentTime = System.currentTimeMillis(); try { conn = getConnection(); @@ -321,6 +377,10 @@ public class PhoenixHBaseAccessor { UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME)); for (TimelineMetrics timelineMetrics : timelineMetricsCollection) { for (TimelineMetric metric : timelineMetrics.getMetrics()) { + if (metric.getAppId().startsWith("anomaly-engine") && !metric.getAppId().equals("anomaly-engine-test-metric")) { + commitAnomalyMetric(conn, metric); + } + metricRecordStmt.clearParameters(); if (LOG.isTraceEnabled()) { @@ -469,6 +529,20 @@ public class PhoenixHBaseAccessor { stmt.executeUpdate( String.format(CREATE_CONTAINER_METRICS_TABLE_SQL, encoding, tableTTL.get(CONTAINER_METRICS_TABLE_NAME), compression)); + //Anomaly Metrics + stmt.executeUpdate(String.format(CREATE_ANOMALY_METRICS_TABLE_SQL, + ANOMALY_METRICS_TABLE_NAME, + encoding, + tableTTL.get(METRICS_AGGREGATE_HOURLY_TABLE_NAME), + compression)); + + //Trend Anomaly Metrics + stmt.executeUpdate(String.format(CREATE_TREND_ANOMALY_METRICS_TABLE_SQL, + TREND_ANOMALY_METRICS_TABLE_NAME, + encoding, + tableTTL.get(METRICS_AGGREGATE_HOURLY_TABLE_NAME), + compression)); + // Host level String precisionSql = String.format(CREATE_METRICS_TABLE_SQL, encoding, tableTTL.get(METRICS_RECORD_TABLE_NAME), compression); @@ -842,6 +916,48 @@ public class PhoenixHBaseAccessor { insertMetricRecords(metrics, false); } + public TimelineMetrics getAnomalyMetricRecords(String method, long startTime, long endTime, Integer limit) throws SQLException { + Connection conn = getConnection(); + PreparedStatement stmt = null; + ResultSet rs = null; + TimelineMetrics metrics = new TimelineMetrics(); + try { + stmt = PhoenixTransactSQL.prepareAnomalyMetricsGetSqlStatement(conn, method, startTime, endTime, limit); + rs = stmt.executeQuery(); + while (rs.next()) { + + byte[] uuid = rs.getBytes("UUID"); + TimelineMetric metric = metadataManagerInstance.getMetricFromUuid(uuid); + + if (method.equals("ks") || method.equals("hsdev")) { + metric.setStartTime(rs.getLong("TEST_END_TIME")); + } else { + metric.setStartTime(rs.getLong("SERVER_TIME")); + } + metric.setInstanceId(null); + + HashMap<String, String> metadata = new HashMap<>(); + metadata.put("method", rs.getString("METHOD")); + metadata.put("anomaly-score", String.valueOf(rs.getDouble("ANOMALY_SCORE"))); + if (method.equals("ks") || method.equals("hsdev")) { + metadata.put("test-start-time", String.valueOf(rs.getLong("TEST_START_TIME"))); + metadata.put("train-start-time", String.valueOf(rs.getLong("TRAIN_START_TIME"))); + metadata.put("train-end-time", String.valueOf(rs.getLong("TRAIN_END_TIME"))); + } + metric.setMetadata(metadata); + + TreeMap<Long, Double> sortedByTimeMetrics = readMetricFromJSON(rs.getString("METRICS")); + metric.setMetricValues(sortedByTimeMetrics); + + metrics.getMetrics().add(metric); + } + } catch (Exception ex) { + LOG.error(ex); + } + return metrics; + } + + @SuppressWarnings("unchecked") public TimelineMetrics getMetricRecords( final Condition condition, Multimap<String, List<Function>> metricFunctions) diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java index 258e9c6..85dad1f 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -322,16 +322,14 @@ public class TimelineMetricConfiguration { public static final String TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES = "timeline.metrics.precision.table.hbase.hstore.blockingStoreFiles"; -<<<<<<< HEAD public static final String TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS = "timeline.metrics.support.multiple.clusters"; public static final String TIMELINE_METRICS_EVENT_METRIC_PATTERNS = "timeline.metrics.downsampler.event.metric.patterns"; -======= + public static final String TIMELINE_METRICS_UUID_GEN_STRATEGY = "timeline.metrics.uuid.gen.strategy"; ->>>>>>> AMBARI-21214 : Use a uuid vs long row key for metrics in AMS schema. (avijayan) public static final String HOST_APP_ID = "HOST"; @@ -534,6 +532,13 @@ public class TimelineMetricConfiguration { return defaultRpcAddress; } + public String getKafkaServers() { + if (metricsConf != null) { + return metricsConf.get("timeline.metrics.kafka.servers", null); + } + return null; + } + public boolean isDistributedCollectorModeDisabled() { try { if (getMetricsConf() != null) { diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java index dab4494..cdeefdc 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java @@ -107,4 +107,6 @@ public interface TimelineMetricStore { * @return [ hostname ] */ List<String> getLiveInstances(); + + TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) throws SQLException; } diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java index 2478fb1..75a9d28 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Statement; import java.util.List; import java.util.concurrent.TimeUnit; @@ -37,6 +38,27 @@ public class PhoenixTransactSQL { public static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class); + public static final String CREATE_ANOMALY_METRICS_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS %s " + + "(UUID BINARY(20) NOT NULL, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "METRICS VARCHAR, " + + "METHOD VARCHAR, " + + "ANOMALY_SCORE DOUBLE CONSTRAINT pk " + + "PRIMARY KEY (UUID, SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='%s'"; + + public static final String CREATE_TREND_ANOMALY_METRICS_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS %s " + + "(UUID BINARY(20) NOT NULL, " + + "TEST_START_TIME UNSIGNED_LONG NOT NULL, " + + "TEST_END_TIME UNSIGNED_LONG NOT NULL, " + + "TRAIN_START_TIME UNSIGNED_LONG, " + + "TRAIN_END_TIME UNSIGNED_LONG, " + + "METRICS VARCHAR, " + + "METHOD VARCHAR, " + + "ANOMALY_SCORE DOUBLE CONSTRAINT pk " + + "PRIMARY KEY (UUID, TEST_START_TIME, TEST_END_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='%s'"; + /** * Create table to store individual metric records. */ @@ -146,6 +168,25 @@ public class PhoenixTransactSQL { */ public static final String ALTER_SQL = "ALTER TABLE %s SET TTL=%s"; + public static final String UPSERT_ANOMALY_METRICS_SQL = "UPSERT INTO %s " + + "(UUID, " + + "SERVER_TIME, " + + "METRICS, " + + "METHOD, " + + "ANOMALY_SCORE) VALUES " + + "(?, ?, ?, ?, ?)"; + + public static final String UPSERT_TREND_ANOMALY_METRICS_SQL = "UPSERT INTO %s " + + "(UUID, " + + "TEST_START_TIME, " + + "TEST_END_TIME, " + + "TRAIN_START_TIME, " + + "TRAIN_END_TIME, " + + "METRICS, " + + "METHOD, " + + "ANOMALY_SCORE) VALUES " + + "(?, ?, ?, ?, ?, ?, ?, ?)"; + /** * Insert into metric records table. */ @@ -221,6 +262,22 @@ public class PhoenixTransactSQL { public static final String UPSERT_INSTANCE_HOST_METADATA_SQL = "UPSERT INTO INSTANCE_HOST_METADATA (INSTANCE_ID, HOSTNAME) VALUES (?, ?)"; + public static final String GET_ANOMALY_METRIC_SQL = "SELECT UUID, SERVER_TIME, " + + "METRICS, " + + "METHOD, " + + "ANOMALY_SCORE " + + "FROM %s " + + "WHERE METHOD = ? AND SERVER_TIME > ? AND SERVER_TIME <= ? ORDER BY ANOMALY_SCORE DESC"; + + public static final String GET_TREND_ANOMALY_METRIC_SQL = "SELECT UUID, " + + "TEST_START_TIME, TEST_END_TIME, " + + "TRAIN_START_TIME, TRAIN_END_TIME, " + + "METRICS, " + + "METHOD, " + + "ANOMALY_SCORE " + + "FROM %s " + + "WHERE METHOD = ? AND TEST_END_TIME > ? AND TEST_END_TIME <= ? ORDER BY ANOMALY_SCORE DESC"; + /** * Retrieve a set of rows from metrics records table. */ @@ -345,6 +402,9 @@ public class PhoenixTransactSQL { "MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE METRIC_NAME LIKE %s AND SERVER_TIME > %s AND " + "SERVER_TIME <= %s GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS"; + public static final String ANOMALY_METRICS_TABLE_NAME = "METRIC_ANOMALIES"; + public static final String TREND_ANOMALY_METRICS_TABLE_NAME = "TREND_METRIC_ANOMALIES"; + public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD"; public static final String CONTAINER_METRICS_TABLE_NAME = "CONTAINER_METRICS"; @@ -407,6 +467,40 @@ public class PhoenixTransactSQL { PhoenixTransactSQL.sortMergeJoinEnabled = sortMergeJoinEnabled; } + public static PreparedStatement prepareAnomalyMetricsGetSqlStatement(Connection connection, String method, + long startTime, long endTime, Integer limit) throws SQLException { + StringBuilder sb = new StringBuilder(); + if (method.equals("ema") || method.equals("tukeys")) { + sb.append(String.format(GET_ANOMALY_METRIC_SQL, ANOMALY_METRICS_TABLE_NAME)); + } else { + sb.append(String.format(GET_TREND_ANOMALY_METRIC_SQL, TREND_ANOMALY_METRICS_TABLE_NAME)); + } + if (limit != null) { + sb.append(" LIMIT " + limit); + } + PreparedStatement stmt = null; + try { + stmt = connection.prepareStatement(sb.toString()); + int pos = 1; + + stmt.setString(pos++, method); + stmt.setLong(pos++, startTime); + stmt.setLong(pos, endTime); + if (limit != null) { + stmt.setFetchSize(limit); + } + + } catch (SQLException e) { + if (stmt != null) { + stmt.close(); + } + throw e; + } + + return stmt; + } + + public static PreparedStatement prepareGetMetricsSqlStmt(Connection connection, Condition condition) throws SQLException { diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java new file mode 100644 index 0000000..6f7b14a --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.ambari.metrics.alertservice.prototype.MetricAnomalyDetectorTestInput; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +@Singleton +@Path("/ws/v1/metrictestservice") +public class MetricAnomalyDetectorTestService { + + private static final Log LOG = LogFactory.getLog(MetricAnomalyDetectorTestService.class); + + @Inject + public MetricAnomalyDetectorTestService() { + } + + private void init(HttpServletResponse response) { + response.setContentType(null); + } + + @Path("/anomaly") + @POST + @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + public TimelinePutResponse postAnomalyDetectionRequest( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + MetricAnomalyDetectorTestInput input) { + + init(res); + if (input == null) { + return new TimelinePutResponse(); + } + + try { + return null; + } catch (Exception e) { + throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + + @GET + @Path("/dataseries") + @Produces({MediaType.APPLICATION_JSON}) + public TimelineMetrics getTestDataSeries( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @QueryParam("type") String seriesType, + @QueryParam("configs") String config + ) { + return null; + } +} diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java index 472a787..20aba23 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java @@ -24,7 +24,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException; @@ -37,6 +36,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.metrics2.sink.timeline.Precision; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.TestMetricSeriesGenerator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.EntityIdentifier; @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.webapp.BadRequestException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -75,6 +76,10 @@ import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER; @@ -389,7 +394,7 @@ public class TimelineWebServices { } return timelineMetricStore.getTimelineMetrics( - parseListStr(metricNames, ","), parseListStr(hostname, ","), appId, instanceId, + parseListStr(metricNames, ","), parseListStr(hostname, ","), appId, parseStr(instanceId), parseLongStr(startTime), parseLongStr(endTime), Precision.getPrecision(precision), parseIntStr(limit), parseBoolean(grouped), parseTopNConfig(topN, topNFunction, isBottomN), @@ -412,6 +417,25 @@ public class TimelineWebServices { } @GET + @Path("/metrics/anomalies") + @Produces({ MediaType.APPLICATION_JSON }) + public TimelineMetrics getAnomalyMetrics( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @QueryParam("method") String method, + @QueryParam("startTime") String startTime, + @QueryParam("endTime") String endTime, + @QueryParam("limit") String limit + ) { + init(res); + + try { + return timelineMetricStore.getAnomalyMetrics(method, parseLongStr(startTime), parseLongStr(endTime), parseIntStr(limit)); + } catch (Exception e) { + throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + @GET @Path("/metrics/metadata") @Produces({ MediaType.APPLICATION_JSON }) public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata( @@ -660,6 +684,12 @@ public class TimelineWebServices { } private static String parseStr(String str) { - return str == null ? null : str.trim(); + String trimmedInstance = (str == null) ? null : str.trim(); + if (trimmedInstance != null) { + if (trimmedInstance.isEmpty() || trimmedInstance.equalsIgnoreCase("undefined")) { + trimmedInstance = null; + } + } + return trimmedInstance; } } diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java index 07e0daa..7c879e1 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java @@ -119,4 +119,9 @@ public class TestTimelineMetricStore implements TimelineMetricStore { return null; } + @Override + public TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) { + return null; + } + } diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json index e41adb5..acecb62 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json @@ -142,6 +142,76 @@ "value": "{0} * 100" } } + }, + { + "name": "point_in_time_metrics_anomalies", + "label": "Point in Time metric anomalies", + "description": "This service-level alert if there are metric anomalies in the last 10 mins or configured interval.", + "interval": 10, + "scope": "ANY", + "enabled": true, + "source": { + "type": "SCRIPT", + "path": "AMBARI_METRICS/0.1.0/package/alerts/alert_point_in_time_metric_anomalies.py", + "parameters": [ + { + "name": "num_anomalies", + "display_name": "Value of N in Top 'N' anomalies to be reported.", + "value": 5, + "type": "NUMERIC", + "description": "Report only this amount of anomalies." + }, + { + "name": "interval", + "display_name": "Query Time interval in minutes", + "value": 10, + "type": "NUMERIC", + "description": "Query Time interval in minutes." + }, + { + "name": "sensitivity", + "display_name": "Alert Sensitivity", + "value": 50, + "type": "NUMERIC", + "description": "Sensitivity of the alert. Scale of 1 - 100. Default = 50." + } + ] + } + }, + { + "name": "trend_metrics_anomalies", + "label": "Trend metric anomalies", + "description": "This service-level alert if there are metric anomalies in the last 10 mins or configured interval.", + "interval": 10, + "scope": "ANY", + "enabled": true, + "source": { + "type": "SCRIPT", + "path": "AMBARI_METRICS/0.1.0/package/alerts/alert_trend_metric_anomalies.py", + "parameters": [ + { + "name": "num_anomalies", + "display_name": "Value of N in Top 'N' anomalies to be reported.", + "value": 5, + "type": "NUMERIC", + "description": "Report only this amount of anomalies." + }, + { + "name": "interval", + "display_name": "Query Time interval in minutes", + "value": 10, + "type": "NUMERIC", + "description": "Query Time interval in minutes." + }, + { + "name": "sensitivity", + "display_name": "Alert Sensitivity", + "value": 50, + "type": "NUMERIC", + "description": "Sensitivity of the alert. Scale of 1 - 100. Default = 50." + } + ] + } } ], "METRICS_MONITOR": [ diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/alerts/alert_point_in_time_metric_anomalies.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/alerts/alert_point_in_time_metric_anomalies.py new file mode 100644 index 0000000..154ce1c --- /dev/null +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/alerts/alert_point_in_time_metric_anomalies.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import json +import urllib +import time +import os +import ambari_commons.network as network +import logging + +from ambari_agent.AmbariConfig import AmbariConfig + +RESULT_STATE_OK = 'OK' +RESULT_STATE_CRITICAL = 'CRITICAL' +RESULT_STATE_WARNING = 'WARNING' +RESULT_STATE_UNKNOWN = 'UNKNOWN' +RESULT_STATE_SKIPPED = 'SKIPPED' + +AMS_HTTP_POLICY = '{{ams-site/timeline.metrics.service.http.policy}}' +METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY = '{{ams-site/timeline.metrics.service.webapp.address}}' +METRICS_COLLECTOR_VIP_HOST_KEY = '{{cluster-env/metrics_collector_vip_host}}' +METRICS_COLLECTOR_VIP_PORT_KEY = '{{cluster-env/metrics_collector_vip_port}}' + +INTERVAL_PARAM_KEY = 'interval' +INTERVAL_PARAM_DEFAULT = 10 + +NUM_ANOMALIES_KEY = 'num_anomalies' +NUM_ANOMALIES_DEFAULT = 5 + +SENSITIVITY_KEY = 'sensitivity' +SENSITIVITY_DEFAULT = 5 + +AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics/anomalies?%s" + +logger = logging.getLogger() + +def get_tokens(): + """ + Returns a tuple of tokens in the format {{site/property}} that will be used + to build the dictionary passed into execute + """ + return (METRICS_COLLECTOR_VIP_HOST_KEY, METRICS_COLLECTOR_VIP_PORT_KEY, + METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY, AMS_HTTP_POLICY) + + +def execute(configurations={}, parameters={}, host_name=None): + """ + Returns a tuple containing the result code and a pre-formatted result label + + Keyword arguments: + configurations (dictionary): a mapping of configuration key to value + parameters (dictionary): a mapping of script parameter key to value + host_name (string): the name of this host where the alert is running + """ + + """ + Get ready with AMS GET url. + Query AMS for point in time anomalies in the last 30mins. + Generate a message with anomalies. + """ + if configurations is None: + return (RESULT_STATE_UNKNOWN, ['There were no configurations supplied to the script.']) + + collector_host = host_name + current_time = int(time.time()) * 1000 + + interval = INTERVAL_PARAM_DEFAULT + if INTERVAL_PARAM_KEY in parameters: + interval = _coerce_to_integer(parameters[INTERVAL_PARAM_KEY]) + + num_anomalies = NUM_ANOMALIES_DEFAULT + if NUM_ANOMALIES_KEY in parameters: + num_anomalies = _coerce_to_integer(parameters[NUM_ANOMALIES_KEY]) + + sensitivity = SENSITIVITY_DEFAULT + if SENSITIVITY_KEY in parameters: + sensitivity = _coerce_to_integer(parameters[SENSITIVITY_KEY]) + + if METRICS_COLLECTOR_VIP_HOST_KEY in configurations and METRICS_COLLECTOR_VIP_PORT_KEY in configurations: + collector_host = configurations[METRICS_COLLECTOR_VIP_HOST_KEY] + collector_port = int(configurations[METRICS_COLLECTOR_VIP_PORT_KEY]) + else: + # ams-site/timeline.metrics.service.webapp.address is required + if not METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY in configurations: + return (RESULT_STATE_UNKNOWN, + ['{0} is a required parameter for the script'.format(METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY)]) + else: + collector_webapp_address = configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY].split(":") + if valid_collector_webapp_address(collector_webapp_address): + collector_port = int(collector_webapp_address[1]) + else: + return (RESULT_STATE_UNKNOWN, ['{0} value should be set as "fqdn_hostname:port", but set to {1}'.format( + METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY, configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY])]) + + get_ema_anomalies_parameters = { + "method": "ema", + "startTime": current_time - interval * 60 * 1000, + "endTime": current_time, + "limit": num_anomalies + 1 + } + + encoded_get_metrics_parameters = urllib.urlencode(get_ema_anomalies_parameters) + + ams_collector_conf_dir = "/etc/ambari-metrics-collector/conf" + metric_truststore_ca_certs = 'ca.pem' + ca_certs = os.path.join(ams_collector_conf_dir, + metric_truststore_ca_certs) + metric_collector_https_enabled = str(configurations[AMS_HTTP_POLICY]) == "HTTPS_ONLY" + + try: + conn = network.get_http_connection( + collector_host, + int(collector_port), + metric_collector_https_enabled, + ca_certs, + ssl_version=AmbariConfig.get_resolved_config().get_force_https_protocol_value() + ) + conn.request("GET", AMS_METRICS_GET_URL % encoded_get_metrics_parameters) + response = conn.getresponse() + data = response.read() + logger.info("Data read from metric anomaly endpoint") + logger.info(data) + conn.close() + except Exception: + return (RESULT_STATE_UNKNOWN, ["Unable to retrieve anomaly metrics from the Ambari Metrics service."]) + + if response.status != 200: + return (RESULT_STATE_UNKNOWN, ["Unable to retrieve anomaly metrics from the Ambari Metrics service."]) + + data_json = json.loads(data) + length = len(data_json["metrics"]) + logger.info("Number of anomalies returned : {0}".format(length)) + + if length == 0: + alert_state = RESULT_STATE_OK + alert_label = 'No point in time anomalies in the last {0} minutes.'.format(interval) + logger.info(alert_label) + elif length <= 5: + alert_state = RESULT_STATE_WARNING + alert_label = "http://avijayan-ad-1.openstacklocal:3000/dashboard/script/scripted.js?anomalies=" + data + else: + alert_state = RESULT_STATE_CRITICAL + alert_label = "http://avijayan-ad-1.openstacklocal:3000/dashboard/script/scripted.js?anomalies=" + data + + return (alert_state, [alert_label]) + + +def valid_collector_webapp_address(webapp_address): + if len(webapp_address) == 2 \ + and webapp_address[0] != '127.0.0.1' \ + and webapp_address[1].isdigit(): + return True + + return False + + +def _coerce_to_integer(value): + """ + Attempts to correctly coerce a value to an integer. For the case of an integer or a float, + this will essentially either NOOP or return a truncated value. If the parameter is a string, + then it will first attempt to be coerced from a integer, and failing that, a float. + :param value: the value to coerce + :return: the coerced value as an integer + """ + try: + return int(value) + except ValueError: + return int(float(value)) diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/alerts/alert_trend_metric_anomalies.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/alerts/alert_trend_metric_anomalies.py new file mode 100644 index 0000000..8813d8e --- /dev/null +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/alerts/alert_trend_metric_anomalies.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import json +import urllib +import time +import os +import ambari_commons.network as network +import logging + +from ambari_agent.AmbariConfig import AmbariConfig + +RESULT_STATE_OK = 'OK' +RESULT_STATE_CRITICAL = 'CRITICAL' +RESULT_STATE_WARNING = 'WARNING' +RESULT_STATE_UNKNOWN = 'UNKNOWN' +RESULT_STATE_SKIPPED = 'SKIPPED' + +AMS_HTTP_POLICY = '{{ams-site/timeline.metrics.service.http.policy}}' +METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY = '{{ams-site/timeline.metrics.service.webapp.address}}' +METRICS_COLLECTOR_VIP_HOST_KEY = '{{cluster-env/metrics_collector_vip_host}}' +METRICS_COLLECTOR_VIP_PORT_KEY = '{{cluster-env/metrics_collector_vip_port}}' + +INTERVAL_PARAM_KEY = 'interval' +INTERVAL_PARAM_DEFAULT = 10 + +NUM_ANOMALIES_KEY = 'num_anomalies' +NUM_ANOMALIES_DEFAULT = 5 + +SENSITIVITY_KEY = 'sensitivity' +SENSITIVITY_DEFAULT = 5 + +AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics/anomalies?%s" + +logger = logging.getLogger() + +def get_tokens(): + """ + Returns a tuple of tokens in the format {{site/property}} that will be used + to build the dictionary passed into execute + """ + return (METRICS_COLLECTOR_VIP_HOST_KEY, METRICS_COLLECTOR_VIP_PORT_KEY, + METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY, AMS_HTTP_POLICY) + + +def execute(configurations={}, parameters={}, host_name=None): + """ + Returns a tuple containing the result code and a pre-formatted result label + + Keyword arguments: + configurations (dictionary): a mapping of configuration key to value + parameters (dictionary): a mapping of script parameter key to value + host_name (string): the name of this host where the alert is running + """ + + """ + Get ready with AMS GET url. + Query AMS for point in time anomalies in the last 30mins. + Generate a message with anomalies. + """ + if configurations is None: + return (RESULT_STATE_UNKNOWN, ['There were no configurations supplied to the script.']) + + collector_host = host_name + current_time = int(time.time()) * 1000 + + interval = INTERVAL_PARAM_DEFAULT + if INTERVAL_PARAM_KEY in parameters: + interval = _coerce_to_integer(parameters[INTERVAL_PARAM_KEY]) + + num_anomalies = NUM_ANOMALIES_DEFAULT + if NUM_ANOMALIES_KEY in parameters: + num_anomalies = _coerce_to_integer(parameters[NUM_ANOMALIES_KEY]) + + sensitivity = SENSITIVITY_DEFAULT + if SENSITIVITY_KEY in parameters: + sensitivity = _coerce_to_integer(parameters[SENSITIVITY_KEY]) + + if METRICS_COLLECTOR_VIP_HOST_KEY in configurations and METRICS_COLLECTOR_VIP_PORT_KEY in configurations: + collector_host = configurations[METRICS_COLLECTOR_VIP_HOST_KEY] + collector_port = int(configurations[METRICS_COLLECTOR_VIP_PORT_KEY]) + else: + # ams-site/timeline.metrics.service.webapp.address is required + if not METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY in configurations: + return (RESULT_STATE_UNKNOWN, + ['{0} is a required parameter for the script'.format(METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY)]) + else: + collector_webapp_address = configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY].split(":") + if valid_collector_webapp_address(collector_webapp_address): + collector_port = int(collector_webapp_address[1]) + else: + return (RESULT_STATE_UNKNOWN, ['{0} value should be set as "fqdn_hostname:port", but set to {1}'.format( + METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY, configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY])]) + + get_ema_anomalies_parameters = { + "method": "ks", + "startTime": current_time - interval * 60 * 1000, + "endTime": current_time, + "limit": num_anomalies + 1 + } + + encoded_get_metrics_parameters = urllib.urlencode(get_ema_anomalies_parameters) + + ams_collector_conf_dir = "/etc/ambari-metrics-collector/conf" + metric_truststore_ca_certs = 'ca.pem' + ca_certs = os.path.join(ams_collector_conf_dir, + metric_truststore_ca_certs) + metric_collector_https_enabled = str(configurations[AMS_HTTP_POLICY]) == "HTTPS_ONLY" + + try: + conn = network.get_http_connection( + collector_host, + int(collector_port), + metric_collector_https_enabled, + ca_certs, + ssl_version=AmbariConfig.get_resolved_config().get_force_https_protocol_value() + ) + conn.request("GET", AMS_METRICS_GET_URL % encoded_get_metrics_parameters) + response = conn.getresponse() + data = response.read() + logger.info("Data read from metric anomaly endpoint") + logger.info(data) + conn.close() + except Exception: + return (RESULT_STATE_UNKNOWN, ["Unable to retrieve anomaly metrics from the Ambari Metrics service."]) + + if response.status != 200: + return (RESULT_STATE_UNKNOWN, ["Unable to retrieve anomaly metrics from the Ambari Metrics service."]) + + data_json = json.loads(data) + length = len(data_json["metrics"]) + logger.info("Number of anomalies returned : {0}".format(length)) + + if length == 0: + alert_state = RESULT_STATE_OK + alert_label = 'No trend anomalies in the last {0} minutes.'.format(interval) + logger.info(alert_label) + elif length <= 5: + alert_state = RESULT_STATE_WARNING + alert_label = "http://avijayan-ad-1.openstacklocal:3000/dashboard/script/scripted.js?anomalies=" + data + else: + alert_state = RESULT_STATE_CRITICAL + alert_label = "http://avijayan-ad-1.openstacklocal:3000/dashboard/script/scripted.js?anomalies=" + data + + return (alert_state, [alert_label]) + + +def valid_collector_webapp_address(webapp_address): + if len(webapp_address) == 2 \ + and webapp_address[0] != '127.0.0.1' \ + and webapp_address[1].isdigit(): + return True + + return False + + +def _coerce_to_integer(value): + """ + Attempts to correctly coerce a value to an integer. For the case of an integer or a float, + this will essentially either NOOP or return a truncated value. If the parameter is a string, + then it will first attempt to be coerced from a integer, and failing that, a float. + :param value: the value to coerce + :return: the coerced value as an integer + """ + try: + return int(value) + except ValueError: + return int(float(value)) diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/alerts/alert_metrics_deviation.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/alerts/alert_metrics_deviation.py index 7f64c80..3f8eb2e 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/alerts/alert_metrics_deviation.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/alerts/alert_metrics_deviation.py @@ -331,10 +331,12 @@ def execute(configurations={}, parameters={}, host_name=None): response = conn.getresponse() data = response.read() conn.close() - except Exception: + except Exception, e: + logger.info(str(e)) return (RESULT_STATE_UNKNOWN, ["Unable to retrieve metrics from the Ambari Metrics service."]) if response.status != 200: + logger.info(str(data)) return (RESULT_STATE_UNKNOWN, ["Unable to retrieve metrics from the Ambari Metrics service."]) data_json = json.loads(data) diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java index b57f7e9..e66e5b8 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java @@ -39,6 +39,7 @@ public class MetricsPaddingMethodTest { timelineMetric.setMetricName("m1"); timelineMetric.setHostName("h1"); timelineMetric.setAppId("a1"); + timelineMetric.setStartTime(now); TreeMap<Long, Double> inputValues = new TreeMap<>(); inputValues.put(now - 1000, 1.0d); inputValues.put(now - 2000, 2.0d); @@ -66,6 +67,7 @@ public class MetricsPaddingMethodTest { timelineMetric.setMetricName("m1"); timelineMetric.setHostName("h1"); timelineMetric.setAppId("a1"); + timelineMetric.setStartTime(now); TreeMap<Long, Double> inputValues = new TreeMap<>(); inputValues.put(now - 1000, 1.0d); inputValues.put(now - 2000, 2.0d); @@ -93,6 +95,7 @@ public class MetricsPaddingMethodTest { timelineMetric.setMetricName("m1"); timelineMetric.setHostName("h1"); timelineMetric.setAppId("a1"); + timelineMetric.setStartTime(now); TreeMap<Long, Double> inputValues = new TreeMap<>(); inputValues.put(now, 0.0d); inputValues.put(now - 1000, 1.0d); @@ -120,6 +123,7 @@ public class MetricsPaddingMethodTest { timelineMetric.setMetricName("m1"); timelineMetric.setHostName("h1"); timelineMetric.setAppId("a1"); + timelineMetric.setStartTime(now); TreeMap<Long, Double> inputValues = new TreeMap<>(); inputValues.put(now - 1000, 1.0d); timelineMetric.setMetricValues(inputValues); @@ -145,6 +149,7 @@ public class MetricsPaddingMethodTest { timelineMetric.setMetricName("m1"); timelineMetric.setHostName("h1"); timelineMetric.setAppId("a1"); + timelineMetric.setStartTime(now); TreeMap<Long, Double> inputValues = new TreeMap<>(); inputValues.put(now - 1000, 1.0d); timelineMetric.setMetricValues(inputValues); @@ -168,6 +173,7 @@ public class MetricsPaddingMethodTest { timelineMetric.setMetricName("m1"); timelineMetric.setHostName("h1"); timelineMetric.setAppId("a1"); + timelineMetric.setStartTime(now); TreeMap<Long, Double> inputValues = new TreeMap<>(); long seconds = 1000; @@ -228,6 +234,7 @@ public class MetricsPaddingMethodTest { timelineMetric.setMetricName("m1"); timelineMetric.setHostName("h1"); timelineMetric.setAppId("a1"); + timelineMetric.setStartTime(now); TreeMap<Long, Double> inputValues = new TreeMap<>(); inputValues.put(now - 100, 1.0d); inputValues.put(now - 200, 2.0d); -- To stop receiving notification emails like this one, please contact [email protected].
