AMBARI-22077 : Create maven module and package structure for the anomaly detection engine. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e33b5455 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e33b5455 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e33b5455 Branch: refs/heads/branch-3.0-ams Commit: e33b5455787e59c077902cdc7f375ce33c434268 Parents: 97dfe6b Author: Aravindan Vijayan <[email protected]> Authored: Wed Sep 27 10:43:48 2017 -0700 Committer: Aravindan Vijayan <[email protected]> Committed: Wed Sep 27 11:46:04 2017 -0700 ---------------------------------------------------------------------- .../ambari-metrics-alertservice/pom.xml | 149 -------- .../prototype/AmbariServerInterface.java | 121 ------ .../MetricAnomalyDetectorTestInput.java | 126 ------ .../prototype/MetricAnomalyTester.java | 163 -------- .../prototype/MetricKafkaProducer.java | 56 --- .../prototype/MetricSparkConsumer.java | 241 ------------ .../prototype/MetricsCollectorInterface.java | 237 ------------ .../prototype/PointInTimeADSystem.java | 260 ------------- .../prototype/RFunctionInvoker.java | 222 ----------- .../prototype/TestSeriesInputRequest.java | 88 ----- .../alertservice/prototype/TrendADSystem.java | 317 ---------------- .../alertservice/prototype/TrendMetric.java | 33 -- .../prototype/common/DataSeries.java | 38 -- .../prototype/common/ResultSet.java | 43 --- .../prototype/common/StatisticUtils.java | 62 --- .../methods/AnomalyDetectionTechnique.java | 32 -- .../prototype/methods/MetricAnomaly.java | 86 ----- .../prototype/methods/ema/EmaModel.java | 131 ------- .../prototype/methods/ema/EmaModelLoader.java | 46 --- .../prototype/methods/ema/EmaTechnique.java | 151 -------- .../prototype/methods/hsdev/HsdevTechnique.java | 81 ---- .../prototype/methods/kstest/KSTechnique.java | 101 ----- .../seriesgenerator/AbstractMetricSeries.java | 25 -- .../seriesgenerator/DualBandMetricSeries.java | 88 ----- .../MetricSeriesGeneratorFactory.java | 379 ------------------- .../seriesgenerator/MonotonicMetricSeries.java | 101 ----- .../seriesgenerator/NormalMetricSeries.java | 81 ---- .../SteadyWithTurbulenceMetricSeries.java | 115 ------ .../StepFunctionMetricSeries.java | 107 ------ .../seriesgenerator/UniformMetricSeries.java | 95 ----- .../src/main/resources/R-scripts/ema.R | 96 ----- .../src/main/resources/R-scripts/hsdev.r | 67 ---- .../src/main/resources/R-scripts/iforest.R | 52 --- .../src/main/resources/R-scripts/kstest.r | 38 -- .../src/main/resources/R-scripts/test.R | 85 ----- .../src/main/resources/R-scripts/tukeys.r | 51 --- .../src/main/resources/input-config.properties | 42 -- .../prototype/TestEmaTechnique.java | 106 ------ .../prototype/TestRFunctionInvoker.java | 161 -------- .../alertservice/prototype/TestTukeys.java | 99 ----- .../MetricSeriesGeneratorTest.java | 108 ------ .../ambari-metrics-anomaly-detector/pom.xml | 205 ++++++++++ .../prototype/common/DataSeries.java | 38 ++ .../prototype/common/ResultSet.java | 43 +++ .../prototype/common/StatisticUtils.java | 62 +++ .../prototype/core/AmbariServerInterface.java | 121 ++++++ .../prototype/core/MetricKafkaProducer.java | 56 +++ .../prototype/core/MetricSparkConsumer.java | 239 ++++++++++++ .../core/MetricsCollectorInterface.java | 237 ++++++++++++ .../prototype/core/PointInTimeADSystem.java | 260 +++++++++++++ .../prototype/core/RFunctionInvoker.java | 222 +++++++++++ .../prototype/core/TrendADSystem.java | 317 ++++++++++++++++ .../prototype/core/TrendMetric.java | 33 ++ .../methods/AnomalyDetectionTechnique.java | 32 ++ .../prototype/methods/MetricAnomaly.java | 86 +++++ .../prototype/methods/ema/EmaModel.java | 131 +++++++ .../prototype/methods/ema/EmaModelLoader.java | 46 +++ .../prototype/methods/ema/EmaTechnique.java | 151 ++++++++ .../prototype/methods/hsdev/HsdevTechnique.java | 81 ++++ .../prototype/methods/kstest/KSTechnique.java | 101 +++++ .../MetricAnomalyDetectorTestInput.java | 126 ++++++ .../testing/utilities/MetricAnomalyTester.java | 166 ++++++++ .../utilities/TestMetricSeriesGenerator.java | 92 +++++ .../utilities/TestSeriesInputRequest.java | 88 +++++ .../seriesgenerator/AbstractMetricSeries.java | 25 ++ .../seriesgenerator/DualBandMetricSeries.java | 88 +++++ .../MetricSeriesGeneratorFactory.java | 379 +++++++++++++++++++ .../seriesgenerator/MonotonicMetricSeries.java | 101 +++++ .../seriesgenerator/NormalMetricSeries.java | 81 ++++ .../SteadyWithTurbulenceMetricSeries.java | 115 ++++++ .../StepFunctionMetricSeries.java | 107 ++++++ .../seriesgenerator/UniformMetricSeries.java | 95 +++++ .../src/main/resources/R-scripts/ema.R | 96 +++++ .../src/main/resources/R-scripts/hsdev.r | 67 ++++ .../src/main/resources/R-scripts/iforest.R | 52 +++ .../src/main/resources/R-scripts/kstest.r | 38 ++ .../src/main/resources/R-scripts/test.R | 85 +++++ .../src/main/resources/R-scripts/tukeys.r | 51 +++ .../src/main/resources/input-config.properties | 42 ++ .../metrics/spark/MetricAnomalyDetector.scala | 127 +++++++ .../metrics/spark/SparkPhoenixReader.scala | 78 ++++ .../prototype/TestEmaTechnique.java | 106 ++++++ .../prototype/TestRFunctionInvoker.java | 161 ++++++++ .../alertservice/prototype/TestTukeys.java | 100 +++++ .../MetricSeriesGeneratorTest.java | 101 +++++ .../sink/timeline/RawMetricsPublisherTest.java | 4 - ambari-metrics/ambari-metrics-spark/pom.xml | 151 -------- .../metrics/spark/MetricAnomalyDetector.scala | 109 ------ .../metrics/spark/SparkPhoenixReader.scala | 88 ----- .../ambari-metrics-timelineservice/pom.xml | 6 - .../metrics/TestMetricSeriesGenerator.java | 87 ----- .../MetricAnomalyDetectorTestService.java | 87 ----- .../webapp/TimelineWebServices.java | 1 - ambari-metrics/pom.xml | 3 +- 94 files changed, 5029 insertions(+), 5215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/pom.xml b/ambari-metrics/ambari-metrics-alertservice/pom.xml deleted file mode 100644 index 4db8a6a..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/pom.xml +++ /dev/null @@ -1,149 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>ambari-metrics</artifactId> - <groupId>org.apache.ambari</groupId> - <version>2.0.0.0-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> - <artifactId>ambari-metrics-alertservice</artifactId> - <version>2.0.0.0-SNAPSHOT</version> - <build> - <plugins> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>1.8</source> - <target>1.8</target> - </configuration> - </plugin> - </plugins> - </build> - <name>Ambari Metrics Alert Service</name> - <packaging>jar</packaging> - - <dependencies> - - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - <version>2.5</version> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>1.7.2</version> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>1.7.2</version> - </dependency> - - <dependency> - <groupId>com.github.lucarosellini.rJava</groupId> - <artifactId>JRI</artifactId> - <version>0.9-7</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_2.11</artifactId> - <version>2.1.1</version> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - <version>0.10.1.0</version> - <exclusions> - <exclusion> - <groupId>com.sun.jdmk</groupId> - <artifactId>jmxtools</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jmx</groupId> - <artifactId>jmxri</artifactId> - </exclusion> - <exclusion> - <groupId>javax.mail</groupId> - <artifactId>mail</artifactId> - </exclusion> - <exclusion> - <groupId>javax.jms</groupId> - <artifactId>jmx</artifactId> - </exclusion> - <exclusion> - <groupId>javax.jms</groupId> - <artifactId>jms</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - <version>0.10.1.0</version> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>connect-json</artifactId> - <version>0.10.1.0</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-kafka_2.10</artifactId> - <version>1.6.3</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> - <version>1.6.3</version> - </dependency> - <dependency> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-spark</artifactId> - <version>4.7.0-HBase-1.0</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib_2.10</artifactId> - <version>1.3.0</version> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - <version>4.10</version> - </dependency> - <dependency> - <groupId>org.apache.ambari</groupId> - <artifactId>ambari-metrics-common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - <version>4.2.5</version> - </dependency> - </dependencies> -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java deleted file mode 100644 index b98f04c..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.metrics.alertservice.prototype; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONObject; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Serializable; -import java.net.HttpURLConnection; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.Base64; - -public class AmbariServerInterface implements Serializable{ - - private static final Log LOG = LogFactory.getLog(AmbariServerInterface.class); - - private String ambariServerHost; - private String clusterName; - - public AmbariServerInterface(String ambariServerHost, String clusterName) { - this.ambariServerHost = ambariServerHost; - this.clusterName = clusterName; - } - - public int getPointInTimeSensitivity() { - - String url = constructUri("http", ambariServerHost, "8080", "/api/v1/clusters/" + clusterName + "/alert_definitions?fields=*"); - - URL obj = null; - BufferedReader in = null; - - try { - obj = new URL(url); - HttpURLConnection con = (HttpURLConnection) obj.openConnection(); - con.setRequestMethod("GET"); - - String encoded = Base64.getEncoder().encodeToString(("admin:admin").getBytes(StandardCharsets.UTF_8)); - con.setRequestProperty("Authorization", "Basic "+encoded); - - int responseCode = con.getResponseCode(); - LOG.info("Sending 'GET' request to URL : " + url); - LOG.info("Response Code : " + responseCode); - - in = new BufferedReader( - new InputStreamReader(con.getInputStream())); - - StringBuilder responseJsonSb = new StringBuilder(); - String line; - while ((line = in.readLine()) != null) { - responseJsonSb.append(line); - } - - JSONObject jsonObject = new JSONObject(responseJsonSb.toString()); - JSONArray array = jsonObject.getJSONArray("items"); - for(int i = 0 ; i < array.length() ; i++){ - JSONObject alertDefn = array.getJSONObject(i).getJSONObject("AlertDefinition"); - if (alertDefn.get("name") != null && alertDefn.get("name").equals("point_in_time_metrics_anomalies")) { - JSONObject sourceNode = alertDefn.getJSONObject("source"); - JSONArray params = sourceNode.getJSONArray("parameters"); - for(int j = 0 ; j < params.length() ; j++){ - JSONObject param = params.getJSONObject(j); - if (param.get("name").equals("sensitivity")) { - return param.getInt("value"); - } - } - break; - } - } - - } catch (Exception e) { - LOG.error(e); - } finally { - if (in != null) { - try { - in.close(); - } catch (IOException e) { - LOG.warn(e); - } - } - } - - return -1; - } - - private String constructUri(String protocol, String host, String port, String path) { - StringBuilder sb = new StringBuilder(protocol); - sb.append("://"); - sb.append(host); - sb.append(":"); - sb.append(port); - sb.append(path); - return sb.toString(); - } - -// public static void main(String[] args) { -// AmbariServerInterface ambariServerInterface = new AmbariServerInterface(); -// ambariServerInterface.getPointInTimeSensitivity("avijayan-ams-1.openstacklocal","c1"); -// } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyDetectorTestInput.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyDetectorTestInput.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyDetectorTestInput.java deleted file mode 100644 index 490328a..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyDetectorTestInput.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.metrics.alertservice.prototype; - -import javax.xml.bind.annotation.XmlRootElement; -import java.util.List; -import java.util.Map; - -@XmlRootElement -public class MetricAnomalyDetectorTestInput { - - public MetricAnomalyDetectorTestInput() { - } - - //Train data - private String trainDataName; - private String trainDataType; - private Map<String, String> trainDataConfigs; - private int trainDataSize; - - //Test data - private String testDataName; - private String testDataType; - private Map<String, String> testDataConfigs; - private int testDataSize; - - //Algorithm data - private List<String> methods; - private Map<String, String> methodConfigs; - - public String getTrainDataName() { - return trainDataName; - } - - public void setTrainDataName(String trainDataName) { - this.trainDataName = trainDataName; - } - - public String getTrainDataType() { - return trainDataType; - } - - public void setTrainDataType(String trainDataType) { - this.trainDataType = trainDataType; - } - - public Map<String, String> getTrainDataConfigs() { - return trainDataConfigs; - } - - public void setTrainDataConfigs(Map<String, String> trainDataConfigs) { - this.trainDataConfigs = trainDataConfigs; - } - - public String getTestDataName() { - return testDataName; - } - - public void setTestDataName(String testDataName) { - this.testDataName = testDataName; - } - - public String getTestDataType() { - return testDataType; - } - - public void setTestDataType(String testDataType) { - this.testDataType = testDataType; - } - - public Map<String, String> getTestDataConfigs() { - return testDataConfigs; - } - - public void setTestDataConfigs(Map<String, String> testDataConfigs) { - this.testDataConfigs = testDataConfigs; - } - - public Map<String, String> getMethodConfigs() { - return methodConfigs; - } - - public void setMethodConfigs(Map<String, String> methodConfigs) { - this.methodConfigs = methodConfigs; - } - - public int getTrainDataSize() { - return trainDataSize; - } - - public void setTrainDataSize(int trainDataSize) { - this.trainDataSize = trainDataSize; - } - - public int getTestDataSize() { - return testDataSize; - } - - public void setTestDataSize(int testDataSize) { - this.testDataSize = testDataSize; - } - - public List<String> getMethods() { - return methods; - } - - public void setMethods(List<String> methods) { - this.methods = methods; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyTester.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyTester.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyTester.java deleted file mode 100644 index bff8120..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyTester.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype; - -import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; -import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet; -import org.apache.ambari.metrics.alertservice.seriesgenerator.MetricSeriesGeneratorFactory; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; - -public class MetricAnomalyTester { - - public static String appId = MetricsCollectorInterface.serviceName; - static final Log LOG = LogFactory.getLog(MetricAnomalyTester.class); - static Map<String, TimelineMetric> timelineMetricMap = new HashMap<>(); - - public static TimelineMetrics runTestAnomalyRequest(MetricAnomalyDetectorTestInput input) throws UnknownHostException { - - long currentTime = System.currentTimeMillis(); - TimelineMetrics timelineMetrics = new TimelineMetrics(); - String hostname = InetAddress.getLocalHost().getHostName(); - - //Train data - TimelineMetric metric1 = new TimelineMetric(); - if (StringUtils.isNotEmpty(input.getTrainDataName())) { - metric1 = timelineMetricMap.get(input.getTrainDataName()); - if (metric1 == null) { - metric1 = new TimelineMetric(); - double[] trainSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTrainDataType(), input.getTrainDataSize(), input.getTrainDataConfigs()); - metric1.setMetricName(input.getTrainDataName()); - metric1.setAppId(appId); - metric1.setHostName(hostname); - metric1.setStartTime(currentTime); - metric1.setInstanceId(null); - metric1.setMetricValues(getAsTimeSeries(currentTime, trainSeries)); - timelineMetricMap.put(input.getTrainDataName(), metric1); - } - timelineMetrics.getMetrics().add(metric1); - } else { - LOG.error("No train data name specified"); - } - - //Test data - TimelineMetric metric2 = new TimelineMetric(); - if (StringUtils.isNotEmpty(input.getTestDataName())) { - metric2 = timelineMetricMap.get(input.getTestDataName()); - if (metric2 == null) { - metric2 = new TimelineMetric(); - double[] testSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTestDataType(), input.getTestDataSize(), input.getTestDataConfigs()); - metric2.setMetricName(input.getTestDataName()); - metric2.setAppId(appId); - metric2.setHostName(hostname); - metric2.setStartTime(currentTime); - metric2.setInstanceId(null); - metric2.setMetricValues(getAsTimeSeries(currentTime, testSeries)); - timelineMetricMap.put(input.getTestDataName(), metric2); - } - timelineMetrics.getMetrics().add(metric2); - } else { - LOG.warn("No test data name specified"); - } - - //Invoke method - if (CollectionUtils.isNotEmpty(input.getMethods())) { - RFunctionInvoker.setScriptsDir("/etc/ambari-metrics-collector/conf/R-scripts"); - for (String methodType : input.getMethods()) { - ResultSet result = RFunctionInvoker.executeMethod(methodType, getAsDataSeries(metric1), getAsDataSeries(metric2), input.getMethodConfigs()); - TimelineMetric timelineMetric = getAsTimelineMetric(result, methodType, input, currentTime, hostname); - if (timelineMetric != null) { - timelineMetrics.getMetrics().add(timelineMetric); - } - } - } else { - LOG.warn("No anomaly method requested"); - } - - return timelineMetrics; - } - - - private static TimelineMetric getAsTimelineMetric(ResultSet result, String methodType, MetricAnomalyDetectorTestInput input, long currentTime, String hostname) { - - if (result == null) { - return null; - } - - TimelineMetric timelineMetric = new TimelineMetric(); - if (methodType.equals("tukeys") || methodType.equals("ema")) { - timelineMetric.setMetricName(input.getTrainDataName() + "_" + input.getTestDataName() + "_" + methodType + "_" + currentTime); - timelineMetric.setHostName(hostname); - timelineMetric.setAppId(appId); - timelineMetric.setInstanceId(null); - timelineMetric.setStartTime(currentTime); - - TreeMap<Long, Double> metricValues = new TreeMap<>(); - if (result.resultset.size() > 0) { - double[] ts = result.resultset.get(0); - double[] metrics = result.resultset.get(1); - for (int i = 0; i < ts.length; i++) { - if (i == 0) { - timelineMetric.setStartTime((long) ts[i]); - } - metricValues.put((long) ts[i], metrics[i]); - } - } - timelineMetric.setMetricValues(metricValues); - return timelineMetric; - } - return null; - } - - - private static TreeMap<Long, Double> getAsTimeSeries(long currentTime, double[] values) { - - long startTime = currentTime - (values.length - 1) * 60 * 1000; - TreeMap<Long, Double> metricValues = new TreeMap<>(); - - for (int i = 0; i < values.length; i++) { - metricValues.put(startTime, values[i]); - startTime += (60 * 1000); - } - return metricValues; - } - - private static DataSeries getAsDataSeries(TimelineMetric timelineMetric) { - - TreeMap<Long, Double> metricValues = timelineMetric.getMetricValues(); - double[] timestamps = new double[metricValues.size()]; - double[] values = new double[metricValues.size()]; - int i = 0; - - for (Long timestamp : metricValues.keySet()) { - timestamps[i] = timestamp; - values[i++] = metricValues.get(timestamp); - } - return new DataSeries(timelineMetric.getMetricName() + "_" + timelineMetric.getAppId() + "_" + timelineMetric.getHostName(), timestamps, values); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricKafkaProducer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricKafkaProducer.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricKafkaProducer.java deleted file mode 100644 index 8023d15..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricKafkaProducer.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; - -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -public class MetricKafkaProducer { - - Producer producer; - private static String topicName = "ambari-metrics-topic"; - - public MetricKafkaProducer(String kafkaServers) { - Properties configProperties = new Properties(); - configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); //"avijayan-ams-2.openstacklocal:6667" - configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); - configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer"); - producer = new KafkaProducer(configProperties); - } - - public void sendMetrics(TimelineMetrics timelineMetrics) throws InterruptedException, ExecutionException { - - ObjectMapper objectMapper = new ObjectMapper(); - JsonNode jsonNode = objectMapper.valueToTree(timelineMetrics); - ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(topicName,jsonNode); - Future<RecordMetadata> kafkaFuture = producer.send(rec); - - System.out.println(kafkaFuture.isDone()); - System.out.println(kafkaFuture.get().topic()); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java deleted file mode 100644 index 61b3dee..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java +++ /dev/null @@ -1,241 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; -import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.broadcast.Broadcast; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.kafka.KafkaUtils; -import scala.Tuple2; - -import java.util.*; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class MetricSparkConsumer { - - private static final Log LOG = LogFactory.getLog(MetricSparkConsumer.class); - private static String groupId = "ambari-metrics-group"; - private static String topicName = "ambari-metrics-topic"; - private static int numThreads = 1; - private static long pitStartTime = System.currentTimeMillis(); - private static long ksStartTime = pitStartTime; - private static long hdevStartTime = ksStartTime; - private static Set<Pattern> includeMetricPatterns = new HashSet<>(); - private static Set<String> includedHosts = new HashSet<>(); - private static Set<TrendMetric> trendMetrics = new HashSet<>(); - - public MetricSparkConsumer() { - } - - public static Properties readProperties(String propertiesFile) { - try { - Properties properties = new Properties(); - InputStream inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile); - if (inputStream == null) { - inputStream = new FileInputStream(propertiesFile); - } - properties.load(inputStream); - return properties; - } catch (IOException ioEx) { - LOG.error("Error reading properties file for jmeter"); - return null; - } - } - - public static void main(String[] args) throws InterruptedException { - - if (args.length < 1) { - System.err.println("Usage: MetricSparkConsumer <input-config-file>"); - System.exit(1); - } - - Properties properties = readProperties(args[0]); - - List<String> appIds = Arrays.asList(properties.getProperty("appIds").split(",")); - - String collectorHost = properties.getProperty("collectorHost"); - String collectorPort = properties.getProperty("collectorPort"); - String collectorProtocol = properties.getProperty("collectorProtocol"); - - String zkQuorum = properties.getProperty("zkQuorum"); - - double emaW = Double.parseDouble(properties.getProperty("emaW")); - double emaN = Double.parseDouble(properties.getProperty("emaN")); - int emaThreshold = Integer.parseInt(properties.getProperty("emaThreshold")); - double tukeysN = Double.parseDouble(properties.getProperty("tukeysN")); - - long pitTestInterval = Long.parseLong(properties.getProperty("pointInTimeTestInterval")); - long pitTrainInterval = Long.parseLong(properties.getProperty("pointInTimeTrainInterval")); - - long ksTestInterval = Long.parseLong(properties.getProperty("ksTestInterval")); - long ksTrainInterval = Long.parseLong(properties.getProperty("ksTrainInterval")); - int hsdevNhp = Integer.parseInt(properties.getProperty("hsdevNhp")); - long hsdevInterval = Long.parseLong(properties.getProperty("hsdevInterval")); - - String ambariServerHost = properties.getProperty("ambariServerHost"); - String clusterName = properties.getProperty("clusterName"); - - String includeMetricPatternStrings = properties.getProperty("includeMetricPatterns"); - if (includeMetricPatternStrings != null && !includeMetricPatternStrings.isEmpty()) { - String[] patterns = includeMetricPatternStrings.split(","); - for (String p : patterns) { - LOG.info("Included Pattern : " + p); - includeMetricPatterns.add(Pattern.compile(p)); - } - } - - String includedHostList = properties.getProperty("hosts"); - if (includedHostList != null && !includedHostList.isEmpty()) { - String[] hosts = includedHostList.split(","); - includedHosts.addAll(Arrays.asList(hosts)); - } - - MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort); - - SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector"); - - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); - - EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN, emaThreshold); - PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(metricsCollectorInterface, - tukeysN, - pitTestInterval, - pitTrainInterval, - ambariServerHost, - clusterName); - - TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface, - ksTestInterval, - ksTrainInterval, - hsdevNhp); - - Broadcast<EmaTechnique> emaTechniqueBroadcast = jssc.sparkContext().broadcast(emaTechnique); - Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = jssc.sparkContext().broadcast(pointInTimeADSystem); - Broadcast<TrendADSystem> trendADSystemBroadcast = jssc.sparkContext().broadcast(trendADSystem); - Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = jssc.sparkContext().broadcast(metricsCollectorInterface); - Broadcast<Set<Pattern>> includePatternBroadcast = jssc.sparkContext().broadcast(includeMetricPatterns); - Broadcast<Set<String>> includedHostBroadcast = jssc.sparkContext().broadcast(includedHosts); - - JavaPairReceiverInputDStream<String, String> messages = - KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads)); - - //Convert JSON string to TimelineMetrics. - JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() { - @Override - public TimelineMetrics call(Tuple2<String, String> message) throws Exception { - ObjectMapper mapper = new ObjectMapper(); - TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class); - return metrics; - } - }); - - timelineMetricsStream.print(); - - //Group TimelineMetric by AppId. - JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair( - timelineMetrics -> timelineMetrics.getMetrics().isEmpty() ? new Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), timelineMetrics) - ); - - appMetricStream.print(); - - //Filter AppIds that are not needed. - JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() { - @Override - public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception { - return appIds.contains(appMetricTuple._1); - } - }); - - filteredAppMetricStream.print(); - - filteredAppMetricStream.foreachRDD(rdd -> { - rdd.foreach( - tuple2 -> { - long currentTime = System.currentTimeMillis(); - EmaTechnique ema = emaTechniqueBroadcast.getValue(); - if (currentTime > pitStartTime + pitTestInterval) { - LOG.info("Running Tukeys...."); - pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, currentTime); - pitStartTime = pitStartTime + pitTestInterval; - } - - if (currentTime > ksStartTime + ksTestInterval) { - LOG.info("Running KS Test...."); - trendADSystemBroadcast.getValue().runKSTest(currentTime, trendMetrics); - ksStartTime = ksStartTime + ksTestInterval; - } - - if (currentTime > hdevStartTime + hsdevInterval) { - LOG.info("Running HSdev Test...."); - trendADSystemBroadcast.getValue().runHsdevMethod(); - hdevStartTime = hdevStartTime + hsdevInterval; - } - - TimelineMetrics metrics = tuple2._2(); - for (TimelineMetric timelineMetric : metrics.getMetrics()) { - - boolean includeHost = includedHostBroadcast.getValue().contains(timelineMetric.getHostName()); - boolean includeMetric = false; - if (includeHost) { - if (includePatternBroadcast.getValue().isEmpty()) { - includeMetric = true; - } - for (Pattern p : includePatternBroadcast.getValue()) { - Matcher m = p.matcher(timelineMetric.getMetricName()); - if (m.find()) { - includeMetric = true; - } - } - } - - if (includeMetric) { - trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), - timelineMetric.getHostName())); - List<MetricAnomaly> anomalies = ema.test(timelineMetric); - metricsCollectorInterfaceBroadcast.getValue().publish(anomalies); - } - } - }); - }); - - jssc.start(); - jssc.awaitTermination(); - } -} - - - - http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java deleted file mode 100644 index dab4a0a..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java +++ /dev/null @@ -1,237 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype; - -import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.codehaus.jackson.map.AnnotationIntrospector; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.ObjectReader; -import org.codehaus.jackson.map.annotate.JsonSerialize; -import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.Serializable; -import java.net.HttpURLConnection; -import java.net.InetAddress; -import java.net.URL; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.TreeMap; - -public class MetricsCollectorInterface implements Serializable { - - private static String hostName = null; - private String instanceId = null; - public final static String serviceName = "anomaly-engine"; - private String collectorHost; - private String protocol; - private String port; - private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics"; - private static final Log LOG = LogFactory.getLog(MetricsCollectorInterface.class); - private static ObjectMapper mapper; - private final static ObjectReader timelineObjectReader; - - static { - mapper = new ObjectMapper(); - AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); - mapper.setAnnotationIntrospector(introspector); - mapper.getSerializationConfig() - .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); - timelineObjectReader = mapper.reader(TimelineMetrics.class); - } - - public MetricsCollectorInterface(String collectorHost, String protocol, String port) { - this.collectorHost = collectorHost; - this.protocol = protocol; - this.port = port; - this.hostName = getDefaultLocalHostName(); - } - - public static String getDefaultLocalHostName() { - - if (hostName != null) { - return hostName; - } - - try { - return InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException e) { - LOG.info("Error getting host address"); - } - return null; - } - - public void publish(List<MetricAnomaly> metricAnomalies) { - if (CollectionUtils.isNotEmpty(metricAnomalies)) { - LOG.info("Sending metric anomalies of size : " + metricAnomalies.size()); - List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies); - if (!metricList.isEmpty()) { - TimelineMetrics timelineMetrics = new TimelineMetrics(); - timelineMetrics.setMetrics(metricList); - emitMetrics(timelineMetrics); - } - } else { - LOG.debug("No anomalies to send."); - } - } - - private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) { - List<TimelineMetric> metrics = new ArrayList<>(); - - if (metricAnomalies.isEmpty()) { - return metrics; - } - - for (MetricAnomaly anomaly : metricAnomalies) { - TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(anomaly.getMetricKey()); - timelineMetric.setAppId(serviceName + "-" + anomaly.getMethodType()); - timelineMetric.setInstanceId(null); - timelineMetric.setHostName(getDefaultLocalHostName()); - timelineMetric.setStartTime(anomaly.getTimestamp()); - HashMap<String, String> metadata = new HashMap<>(); - metadata.put("method", anomaly.getMethodType()); - metadata.put("anomaly-score", String.valueOf(anomaly.getAnomalyScore())); - timelineMetric.setMetadata(metadata); - TreeMap<Long,Double> metricValues = new TreeMap<>(); - metricValues.put(anomaly.getTimestamp(), anomaly.getMetricValue()); - timelineMetric.setMetricValues(metricValues); - - metrics.add(timelineMetric); - } - return metrics; - } - - public boolean emitMetrics(TimelineMetrics metrics) { - String connectUrl = constructTimelineMetricUri(); - String jsonData = null; - LOG.debug("EmitMetrics connectUrl = " + connectUrl); - try { - jsonData = mapper.writeValueAsString(metrics); - LOG.info(jsonData); - } catch (IOException e) { - LOG.error("Unable to parse metrics", e); - } - if (jsonData != null) { - return emitMetricsJson(connectUrl, jsonData); - } - return false; - } - - private HttpURLConnection getConnection(String spec) throws IOException { - return (HttpURLConnection) new URL(spec).openConnection(); - } - - private boolean emitMetricsJson(String connectUrl, String jsonData) { - int timeout = 10000; - HttpURLConnection connection = null; - try { - if (connectUrl == null) { - throw new IOException("Unknown URL. Unable to connect to metrics collector."); - } - connection = getConnection(connectUrl); - - connection.setRequestMethod("POST"); - connection.setRequestProperty("Content-Type", "application/json"); - connection.setRequestProperty("Connection", "Keep-Alive"); - connection.setConnectTimeout(timeout); - connection.setReadTimeout(timeout); - connection.setDoOutput(true); - - if (jsonData != null) { - try (OutputStream os = connection.getOutputStream()) { - os.write(jsonData.getBytes("UTF-8")); - } - } - - int statusCode = connection.getResponseCode(); - - if (statusCode != 200) { - LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " + - "statusCode = " + statusCode); - } else { - LOG.info("Metrics posted to Collector " + connectUrl); - } - return true; - } catch (IOException ioe) { - LOG.error(ioe.getMessage()); - } - return false; - } - - private String constructTimelineMetricUri() { - StringBuilder sb = new StringBuilder(protocol); - sb.append("://"); - sb.append(collectorHost); - sb.append(":"); - sb.append(port); - sb.append(WS_V1_TIMELINE_METRICS); - return sb.toString(); - } - - public TimelineMetrics fetchMetrics(String metricName, - String appId, - String hostname, - long startime, - long endtime) { - - String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId + - "&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime; - LOG.debug("Fetch metrics URL : " + url); - - URL obj = null; - BufferedReader in = null; - TimelineMetrics timelineMetrics = new TimelineMetrics(); - - try { - obj = new URL(url); - HttpURLConnection con = (HttpURLConnection) obj.openConnection(); - con.setRequestMethod("GET"); - int responseCode = con.getResponseCode(); - LOG.debug("Sending 'GET' request to URL : " + url); - LOG.debug("Response Code : " + responseCode); - - in = new BufferedReader( - new InputStreamReader(con.getInputStream())); - timelineMetrics = timelineObjectReader.readValue(in); - } catch (Exception e) { - LOG.error(e); - } finally { - if (in != null) { - try { - in.close(); - } catch (IOException e) { - LOG.warn(e); - } - } - } - - LOG.info("Fetched " + timelineMetrics.getMetrics().size() + " metrics."); - return timelineMetrics; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java deleted file mode 100644 index b3e7bd3..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java +++ /dev/null @@ -1,260 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype; - -import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; -import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet; -import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaModel; -import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -public class PointInTimeADSystem implements Serializable { - - //private EmaTechnique emaTechnique; - private MetricsCollectorInterface metricsCollectorInterface; - private Map<String, Double> tukeysNMap; - private double defaultTukeysN = 3; - - private long testIntervalMillis = 5*60*1000; //10mins - private long trainIntervalMillis = 15*60*1000; //1hour - - private static final Log LOG = LogFactory.getLog(PointInTimeADSystem.class); - - private AmbariServerInterface ambariServerInterface; - private int sensitivity = 50; - private int minSensitivity = 0; - private int maxSensitivity = 100; - - public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN, - long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) { - this.metricsCollectorInterface = metricsCollectorInterface; - this.defaultTukeysN = defaultTukeysN; - this.tukeysNMap = new HashMap<>(); - this.testIntervalMillis = testIntervalMillis; - this.trainIntervalMillis = trainIntervalMillis; - this.ambariServerInterface = new AmbariServerInterface(ambariServerHost, clusterName); - LOG.info("Starting PointInTimeADSystem..."); - } - - public void runTukeysAndRefineEma(EmaTechnique emaTechnique, long startTime) { - LOG.info("Running Tukeys for test data interval [" + new Date(startTime - testIntervalMillis) + " : " + new Date(startTime) + "], with train data period [" + new Date(startTime - testIntervalMillis - trainIntervalMillis) + " : " + new Date(startTime - testIntervalMillis) + "]"); - - int requiredSensivity = ambariServerInterface.getPointInTimeSensitivity(); - if (requiredSensivity == -1 || requiredSensivity == sensitivity) { - LOG.info("No change in sensitivity needed."); - } else { - LOG.info("Current tukey's N value = " + defaultTukeysN); - if (requiredSensivity > sensitivity) { - int targetSensitivity = Math.min(maxSensitivity, requiredSensivity); - while (sensitivity < targetSensitivity) { - defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.05; - sensitivity++; - } - } else { - int targetSensitivity = Math.max(minSensitivity, requiredSensivity); - while (sensitivity > targetSensitivity) { - defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.05; - sensitivity--; - } - } - LOG.info("New tukey's N value = " + defaultTukeysN); - } - - TimelineMetrics timelineMetrics = new TimelineMetrics(); - for (String metricKey : emaTechnique.getTrackedEmas().keySet()) { - LOG.info("EMA key = " + metricKey); - EmaModel emaModel = emaTechnique.getTrackedEmas().get(metricKey); - String metricName = emaModel.getMetricName(); - String appId = emaModel.getAppId(); - String hostname = emaModel.getHostname(); - - TimelineMetrics tukeysData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, startTime - (testIntervalMillis + trainIntervalMillis), - startTime); - - if (tukeysData.getMetrics().isEmpty()) { - LOG.info("No metrics fetched for Tukeys, metricKey = " + metricKey); - continue; - } - - List<Double> trainTsList = new ArrayList<>(); - List<Double> trainDataList = new ArrayList<>(); - List<Double> testTsList = new ArrayList<>(); - List<Double> testDataList = new ArrayList<>(); - - for (TimelineMetric metric : tukeysData.getMetrics()) { - for (Long timestamp : metric.getMetricValues().keySet()) { - if (timestamp <= (startTime - testIntervalMillis)) { - trainDataList.add(metric.getMetricValues().get(timestamp)); - trainTsList.add((double)timestamp); - } else { - testDataList.add(metric.getMetricValues().get(timestamp)); - testTsList.add((double)timestamp); - } - } - } - - if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { - LOG.info("Not enough train/test data to perform analysis."); - continue; - } - - String tukeysTrainSeries = "tukeysTrainSeries"; - double[] trainTs = new double[trainTsList.size()]; - double[] trainData = new double[trainTsList.size()]; - for (int i = 0; i < trainTs.length; i++) { - trainTs[i] = trainTsList.get(i); - trainData[i] = trainDataList.get(i); - } - - String tukeysTestSeries = "tukeysTestSeries"; - double[] testTs = new double[testTsList.size()]; - double[] testData = new double[testTsList.size()]; - for (int i = 0; i < testTs.length; i++) { - testTs[i] = testTsList.get(i); - testData[i] = testDataList.get(i); - } - - LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length); - - DataSeries tukeysTrainData = new DataSeries(tukeysTrainSeries, trainTs, trainData); - DataSeries tukeysTestData = new DataSeries(tukeysTestSeries, testTs, testData); - - if (!tukeysNMap.containsKey(metricKey)) { - tukeysNMap.put(metricKey, defaultTukeysN); - } - - Map<String, String> configs = new HashMap<>(); - configs.put("tukeys.n", String.valueOf(tukeysNMap.get(metricKey))); - - ResultSet rs = RFunctionInvoker.tukeys(tukeysTrainData, tukeysTestData, configs); - - List<TimelineMetric> tukeysMetrics = getAsTimelineMetric(rs, metricName, appId, hostname); - LOG.info("Tukeys anomalies size : " + tukeysMetrics.size()); - TreeMap<Long, Double> tukeysMetricValues = new TreeMap<>(); - - for (TimelineMetric tukeysMetric : tukeysMetrics) { - tukeysMetricValues.putAll(tukeysMetric.getMetricValues()); - timelineMetrics.addOrMergeTimelineMetric(tukeysMetric); - } - - TimelineMetrics emaData = metricsCollectorInterface.fetchMetrics(metricKey, MetricsCollectorInterface.serviceName+"-ema", MetricsCollectorInterface.getDefaultLocalHostName(), startTime - testIntervalMillis, startTime); - TreeMap<Long, Double> emaMetricValues = new TreeMap(); - if (!emaData.getMetrics().isEmpty()) { - emaMetricValues = emaData.getMetrics().get(0).getMetricValues(); - } - - LOG.info("Ema anomalies size : " + emaMetricValues.size()); - int tp = 0; - int tn = 0; - int fp = 0; - int fn = 0; - - for (double ts : testTs) { - long timestamp = (long) ts; - if (tukeysMetricValues.containsKey(timestamp)) { - if (emaMetricValues.containsKey(timestamp)) { - tp++; - } else { - fn++; - } - } else { - if (emaMetricValues.containsKey(timestamp)) { - fp++; - } else { - tn++; - } - } - } - - double recall = (double) tp / (double) (tp + fn); - double precision = (double) tp / (double) (tp + fp); - LOG.info("----------------------------"); - LOG.info("Precision Recall values for " + metricKey); - LOG.info("tp=" + tp + ", fp=" + fp + ", tn=" + tn + ", fn=" + fn); - LOG.info("----------------------------"); - - if (recall < 0.5) { - LOG.info("Increasing EMA sensitivity by 10%"); - emaModel.updateModel(true, 5); - } else if (precision < 0.5) { - LOG.info("Decreasing EMA sensitivity by 10%"); - emaModel.updateModel(false, 5); - } - - } - - if (emaTechnique.getTrackedEmas().isEmpty()){ - LOG.info("No EMA Technique keys tracked!!!!"); - } - - if (!timelineMetrics.getMetrics().isEmpty()) { - metricsCollectorInterface.emitMetrics(timelineMetrics); - } - } - - private static List<TimelineMetric> getAsTimelineMetric(ResultSet result, String metricName, String appId, String hostname) { - - List<TimelineMetric> timelineMetrics = new ArrayList<>(); - - if (result == null) { - LOG.info("ResultSet from R call is null!!"); - return null; - } - - if (result.resultset.size() > 0) { - double[] ts = result.resultset.get(0); - double[] metrics = result.resultset.get(1); - double[] anomalyScore = result.resultset.get(2); - for (int i = 0; i < ts.length; i++) { - TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(metricName + ":" + appId + ":" + hostname); - timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName()); - timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys"); - timelineMetric.setInstanceId(null); - timelineMetric.setStartTime((long) ts[i]); - TreeMap<Long, Double> metricValues = new TreeMap<>(); - metricValues.put((long) ts[i], metrics[i]); - - HashMap<String, String> metadata = new HashMap<>(); - metadata.put("method", "tukeys"); - if (String.valueOf(anomalyScore[i]).equals("infinity")) { - LOG.info("Got anomalyScore = infinity for " + metricName + ":" + appId + ":" + hostname); - } else { - metadata.put("anomaly-score", String.valueOf(anomalyScore[i])); - } - timelineMetric.setMetadata(metadata); - - timelineMetric.setMetricValues(metricValues); - timelineMetrics.add(timelineMetric); - } - } - - return timelineMetrics; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java deleted file mode 100644 index 4fdf27d..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype; - - -import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet; -import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.rosuda.JRI.REXP; -import org.rosuda.JRI.RVector; -import org.rosuda.JRI.Rengine; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class RFunctionInvoker { - - static final Log LOG = LogFactory.getLog(RFunctionInvoker.class); - public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null); - private static String rScriptDir = "/usr/lib/ambari-metrics-collector/R-scripts"; - - private static void loadDataSets(Rengine r, DataSeries trainData, DataSeries testData) { - r.assign("train_ts", trainData.ts); - r.assign("train_x", trainData.values); - r.eval("train_data <- data.frame(train_ts,train_x)"); - r.eval("names(train_data) <- c(\"TS\", " + trainData.seriesName + ")"); - - r.assign("test_ts", testData.ts); - r.assign("test_x", testData.values); - r.eval("test_data <- data.frame(test_ts,test_x)"); - r.eval("names(test_data) <- c(\"TS\", " + testData.seriesName + ")"); - } - - public static void setScriptsDir(String dir) { - rScriptDir = dir; - } - - public static ResultSet executeMethod(String methodType, DataSeries trainData, DataSeries testData, Map<String, String> configs) { - - ResultSet result; - switch (methodType) { - case "tukeys": - result = tukeys(trainData, testData, configs); - break; - case "ema": - result = ema_global(trainData, testData, configs); - break; - case "ks": - result = ksTest(trainData, testData, configs); - break; - case "hsdev": - result = hsdev(trainData, testData, configs); - break; - default: - result = tukeys(trainData, testData, configs); - break; - } - return result; - } - - public static ResultSet tukeys(DataSeries trainData, DataSeries testData, Map<String, String> configs) { - try { - - REXP exp1 = r.eval("source('" + rScriptDir + "/tukeys.r" + "')"); - - double n = Double.parseDouble(configs.get("tukeys.n")); - r.eval("n <- " + n); - - loadDataSets(r, trainData, testData); - - r.eval("an <- ams_tukeys(train_data, test_data, n)"); - REXP exp = r.eval("an"); - RVector cont = (RVector) exp.getContent(); - List<double[]> result = new ArrayList(); - for (int i = 0; i < cont.size(); i++) { - result.add(cont.at(i).asDoubleArray()); - } - return new ResultSet(result); - } catch (Exception e) { - LOG.error(e); - } finally { - r.end(); - } - return null; - } - - public static ResultSet ema_global(DataSeries trainData, DataSeries testData, Map<String, String> configs) { - try { - r.eval("source('" + rScriptDir + "/ema.r" + "')"); - - int n = Integer.parseInt(configs.get("ema.n")); - r.eval("n <- " + n); - - double w = Double.parseDouble(configs.get("ema.w")); - r.eval("w <- " + w); - - loadDataSets(r, trainData, testData); - - r.eval("an <- ema_global(train_data, test_data, w, n)"); - REXP exp = r.eval("an"); - RVector cont = (RVector) exp.getContent(); - List<double[]> result = new ArrayList(); - for (int i = 0; i < cont.size(); i++) { - result.add(cont.at(i).asDoubleArray()); - } - return new ResultSet(result); - - } catch (Exception e) { - LOG.error(e); - } finally { - r.end(); - } - return null; - } - - public static ResultSet ema_daily(DataSeries trainData, DataSeries testData, Map<String, String> configs) { - try { - r.eval("source('" + rScriptDir + "/ema.r" + "')"); - - int n = Integer.parseInt(configs.get("ema.n")); - r.eval("n <- " + n); - - double w = Double.parseDouble(configs.get("ema.w")); - r.eval("w <- " + w); - - loadDataSets(r, trainData, testData); - - r.eval("an <- ema_daily(train_data, test_data, w, n)"); - REXP exp = r.eval("an"); - RVector cont = (RVector) exp.getContent(); - List<double[]> result = new ArrayList(); - for (int i = 0; i < cont.size(); i++) { - result.add(cont.at(i).asDoubleArray()); - } - return new ResultSet(result); - - } catch (Exception e) { - LOG.error(e); - } finally { - r.end(); - } - return null; - } - - public static ResultSet ksTest(DataSeries trainData, DataSeries testData, Map<String, String> configs) { - try { - r.eval("source('" + rScriptDir + "/kstest.r" + "')"); - - double p_value = Double.parseDouble(configs.get("ks.p_value")); - r.eval("p_value <- " + p_value); - - loadDataSets(r, trainData, testData); - - r.eval("an <- ams_ks(train_data, test_data, p_value)"); - REXP exp = r.eval("an"); - RVector cont = (RVector) exp.getContent(); - List<double[]> result = new ArrayList(); - for (int i = 0; i < cont.size(); i++) { - result.add(cont.at(i).asDoubleArray()); - } - return new ResultSet(result); - - } catch (Exception e) { - LOG.error(e); - } finally { - r.end(); - } - return null; - } - - public static ResultSet hsdev(DataSeries trainData, DataSeries testData, Map<String, String> configs) { - try { - r.eval("source('" + rScriptDir + "/hsdev.r" + "')"); - - int n = Integer.parseInt(configs.get("hsdev.n")); - r.eval("n <- " + n); - - int nhp = Integer.parseInt(configs.get("hsdev.nhp")); - r.eval("nhp <- " + nhp); - - long interval = Long.parseLong(configs.get("hsdev.interval")); - r.eval("interval <- " + interval); - - long period = Long.parseLong(configs.get("hsdev.period")); - r.eval("period <- " + period); - - loadDataSets(r, trainData, testData); - - r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)"); - REXP exp = r.eval("an2"); - RVector cont = (RVector) exp.getContent(); - - List<double[]> result = new ArrayList(); - for (int i = 0; i < cont.size(); i++) { - result.add(cont.at(i).asDoubleArray()); - } - return new ResultSet(result); - } catch (Exception e) { - LOG.error(e); - } finally { - r.end(); - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java deleted file mode 100644 index 7485f01..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype; - -import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException; -import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; - -import javax.xml.bind.annotation.XmlRootElement; -import java.util.Collections; -import java.util.Map; - -@XmlRootElement -public class TestSeriesInputRequest { - - private String seriesName; - private String seriesType; - private Map<String, String> configs; - - public TestSeriesInputRequest() { - } - - public TestSeriesInputRequest(String seriesName, String seriesType, Map<String, String> configs) { - this.seriesName = seriesName; - this.seriesType = seriesType; - this.configs = configs; - } - - public String getSeriesName() { - return seriesName; - } - - public void setSeriesName(String seriesName) { - this.seriesName = seriesName; - } - - public String getSeriesType() { - return seriesType; - } - - public void setSeriesType(String seriesType) { - this.seriesType = seriesType; - } - - public Map<String, String> getConfigs() { - return configs; - } - - public void setConfigs(Map<String, String> configs) { - this.configs = configs; - } - - @Override - public boolean equals(Object o) { - TestSeriesInputRequest anotherInput = (TestSeriesInputRequest)o; - return anotherInput.getSeriesName().equals(this.getSeriesName()); - } - - @Override - public int hashCode() { - return seriesName.hashCode(); - } - - public static void main(String[] args) { - - ObjectMapper objectMapper = new ObjectMapper(); - TestSeriesInputRequest testSeriesInputRequest = new TestSeriesInputRequest("test", "ema", Collections.singletonMap("key","value")); - try { - System.out.print(objectMapper.writeValueAsString(testSeriesInputRequest)); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - } -}
