AMBARI-22077 : Create maven module and package structure for the anomaly detection engine. (Commit 2) (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4613b471 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4613b471 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4613b471 Branch: refs/heads/branch-3.0-ams Commit: 4613b471e20257df7f1f732e9444d8a90c71d743 Parents: e33b545 Author: Aravindan Vijayan <[email protected]> Authored: Wed Sep 27 15:02:56 2017 -0700 Committer: Aravindan Vijayan <[email protected]> Committed: Wed Sep 27 15:02:56 2017 -0700 ---------------------------------------------------------------------- .../pom.xml | 205 ++++++++++ .../adservice/prototype/common/DataSeries.java | 38 ++ .../adservice/prototype/common/ResultSet.java | 43 +++ .../prototype/common/StatisticUtils.java | 62 +++ .../prototype/core/AmbariServerInterface.java | 121 ++++++ .../prototype/core/MetricKafkaProducer.java | 56 +++ .../prototype/core/MetricSparkConsumer.java | 239 ++++++++++++ .../core/MetricsCollectorInterface.java | 237 ++++++++++++ .../prototype/core/PointInTimeADSystem.java | 260 +++++++++++++ .../prototype/core/RFunctionInvoker.java | 222 +++++++++++ .../adservice/prototype/core/TrendADSystem.java | 317 ++++++++++++++++ .../adservice/prototype/core/TrendMetric.java | 33 ++ .../methods/AnomalyDetectionTechnique.java | 30 ++ .../prototype/methods/MetricAnomaly.java | 86 +++++ .../prototype/methods/ema/EmaModel.java | 131 +++++++ .../prototype/methods/ema/EmaModelLoader.java | 40 ++ .../prototype/methods/ema/EmaTechnique.java | 151 ++++++++ .../prototype/methods/hsdev/HsdevTechnique.java | 81 ++++ .../prototype/methods/kstest/KSTechnique.java | 101 +++++ .../MetricAnomalyDetectorTestInput.java | 126 ++++++ .../testing/utilities/MetricAnomalyTester.java | 168 ++++++++ .../utilities/TestMetricSeriesGenerator.java | 92 +++++ .../utilities/TestSeriesInputRequest.java | 88 +++++ .../src/main/resources/R-scripts/ema.R | 96 +++++ .../src/main/resources/R-scripts/hsdev.r | 67 ++++ .../src/main/resources/R-scripts/iforest.R | 52 +++ .../src/main/resources/R-scripts/kstest.r | 38 ++ .../src/main/resources/R-scripts/test.R | 85 +++++ .../src/main/resources/R-scripts/tukeys.r | 51 +++ .../src/main/resources/input-config.properties | 42 ++ .../spark/prototype/MetricAnomalyDetector.scala | 126 ++++++ .../spark/prototype/SparkPhoenixReader.scala | 78 ++++ .../adservice/prototype/TestEmaTechnique.java | 106 ++++++ .../prototype/TestRFunctionInvoker.java | 161 ++++++++ .../metrics/adservice/prototype/TestTukeys.java | 100 +++++ .../seriesgenerator/AbstractMetricSeries.java | 25 ++ .../seriesgenerator/DualBandMetricSeries.java | 88 +++++ .../MetricSeriesGeneratorFactory.java | 377 ++++++++++++++++++ .../MetricSeriesGeneratorTest.java | 101 +++++ .../seriesgenerator/MonotonicMetricSeries.java | 101 +++++ .../seriesgenerator/NormalMetricSeries.java | 81 ++++ .../SteadyWithTurbulenceMetricSeries.java | 115 ++++++ .../StepFunctionMetricSeries.java | 107 ++++++ .../seriesgenerator/UniformMetricSeries.java | 95 +++++ .../ambari-metrics-anomaly-detector/pom.xml | 205 ---------- .../prototype/common/DataSeries.java | 38 -- .../prototype/common/ResultSet.java | 43 --- .../prototype/common/StatisticUtils.java | 62 --- .../prototype/core/AmbariServerInterface.java | 121 ------ .../prototype/core/MetricKafkaProducer.java | 56 --- .../prototype/core/MetricSparkConsumer.java | 239 ------------ .../core/MetricsCollectorInterface.java | 237 ------------ .../prototype/core/PointInTimeADSystem.java | 260 ------------- .../prototype/core/RFunctionInvoker.java | 222 ----------- .../prototype/core/TrendADSystem.java | 317 ---------------- .../prototype/core/TrendMetric.java | 33 -- .../methods/AnomalyDetectionTechnique.java | 32 -- .../prototype/methods/MetricAnomaly.java | 86 ----- .../prototype/methods/ema/EmaModel.java | 131 ------- .../prototype/methods/ema/EmaModelLoader.java | 46 --- .../prototype/methods/ema/EmaTechnique.java | 151 -------- .../prototype/methods/hsdev/HsdevTechnique.java | 81 ---- .../prototype/methods/kstest/KSTechnique.java | 101 ----- .../MetricAnomalyDetectorTestInput.java | 126 ------ .../testing/utilities/MetricAnomalyTester.java | 166 -------- .../utilities/TestMetricSeriesGenerator.java | 92 ----- .../utilities/TestSeriesInputRequest.java | 88 ----- .../seriesgenerator/AbstractMetricSeries.java | 25 -- .../seriesgenerator/DualBandMetricSeries.java | 88 ----- .../MetricSeriesGeneratorFactory.java | 379 ------------------- .../seriesgenerator/MonotonicMetricSeries.java | 101 ----- .../seriesgenerator/NormalMetricSeries.java | 81 ---- .../SteadyWithTurbulenceMetricSeries.java | 115 ------ .../StepFunctionMetricSeries.java | 107 ------ .../seriesgenerator/UniformMetricSeries.java | 95 ----- .../src/main/resources/R-scripts/ema.R | 96 ----- .../src/main/resources/R-scripts/hsdev.r | 67 ---- .../src/main/resources/R-scripts/iforest.R | 52 --- .../src/main/resources/R-scripts/kstest.r | 38 -- .../src/main/resources/R-scripts/test.R | 85 ----- .../src/main/resources/R-scripts/tukeys.r | 51 --- .../src/main/resources/input-config.properties | 42 -- .../metrics/spark/MetricAnomalyDetector.scala | 127 ------- .../metrics/spark/SparkPhoenixReader.scala | 78 ---- .../prototype/TestEmaTechnique.java | 106 ------ .../prototype/TestRFunctionInvoker.java | 161 -------- .../alertservice/prototype/TestTukeys.java | 100 ----- .../MetricSeriesGeneratorTest.java | 101 ----- ambari-metrics/pom.xml | 2 +- 89 files changed, 5020 insertions(+), 5029 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml new file mode 100644 index 0000000..1a10f86 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml @@ -0,0 +1,205 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>ambari-metrics</artifactId> + <groupId>org.apache.ambari</groupId> + <version>2.0.0.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>ambari-metrics-anomaly-detection-service</artifactId> + <version>2.0.0.0-SNAPSHOT</version> + <properties> + <scala.version>2.10.4</scala.version> + <scala.binary.version>2.11</scala.binary.version> + </properties> + + <repositories> + <repository> + <id>scala-tools.org</id> + <name>Scala-Tools Maven2 Repository</name> + <url>http://scala-tools.org/repo-releases</url> + </repository> + </repositories> + + <pluginRepositories> + <pluginRepository> + <id>scala-tools.org</id> + <name>Scala-Tools Maven2 Repository</name> + <url>http://scala-tools.org/repo-releases</url> + </pluginRepository> + </pluginRepositories> + + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <scalaVersion>${scala.version}</scalaVersion> + <args> + <arg>-target:jvm-1.5</arg> + </args> + </configuration> + </plugin> + </plugins> + </build> + <name>Ambari Metrics Anomaly Detection Service</name> + <packaging>jar</packaging> + + <dependencies> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>2.5</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.2</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.2</version> + </dependency> + + <dependency> + <groupId>com.github.lucarosellini.rJava</groupId> + <artifactId>JRI</artifactId> + <version>0.9-7</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_2.11</artifactId> + <version>2.1.1</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>0.10.1.0</version> + <exclusions> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>javax.mail</groupId> + <artifactId>mail</artifactId> + </exclusion> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jmx</artifactId> + </exclusion> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.1.0</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>connect-json</artifactId> + <version>0.10.1.0</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka_2.10</artifactId> + <version>1.6.3</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.10</artifactId> + <version>1.6.3</version> + </dependency> + <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-spark</artifactId> + <version>4.10.0-HBase-1.1</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-mllib_2.10</artifactId> + <version>1.3.0</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + <version>4.10</version> + </dependency> + <dependency> + <groupId>org.apache.ambari</groupId> + <artifactId>ambari-metrics-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.2.5</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>2.1.1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-mllib_${scala.binary.version}</artifactId> + <version>2.1.1</version> + <scope>provided</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/DataSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/DataSeries.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/DataSeries.java new file mode 100644 index 0000000..54b402f --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/DataSeries.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.common; + +import java.util.Arrays; + +public class DataSeries { + + public String seriesName; + public double[] ts; + public double[] values; + + public DataSeries(String seriesName, double[] ts, double[] values) { + this.seriesName = seriesName; + this.ts = ts; + this.values = values; + } + + @Override + public String toString() { + return seriesName + Arrays.toString(ts) + Arrays.toString(values); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/ResultSet.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/ResultSet.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/ResultSet.java new file mode 100644 index 0000000..dd3038f --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/ResultSet.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.common; + + +import java.util.ArrayList; +import java.util.List; + +public class ResultSet { + + public List<double[]> resultset = new ArrayList<>(); + + public ResultSet(List<double[]> resultset) { + this.resultset = resultset; + } + + public void print() { + System.out.println("Result : "); + if (!resultset.isEmpty()) { + for (int i = 0; i<resultset.get(0).length;i++) { + for (double[] entity : resultset) { + System.out.print(entity[i] + " "); + } + System.out.println(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java new file mode 100644 index 0000000..7f0aed3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.common; + + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +public class StatisticUtils { + + public static double mean(double[] values) { + double sum = 0; + for (double d : values) { + sum += d; + } + return sum / values.length; + } + + public static double variance(double[] values) { + double avg = mean(values); + double variance = 0; + for (double d : values) { + variance += Math.pow(d - avg, 2.0); + } + return variance; + } + + public static double sdev(double[] values, boolean useBesselsCorrection) { + double variance = variance(values); + int n = (useBesselsCorrection) ? values.length - 1 : values.length; + return Math.sqrt(variance / n); + } + + public static double median(double[] values) { + double[] clonedValues = Arrays.copyOf(values, values.length); + Arrays.sort(clonedValues); + int n = values.length; + + if (n % 2 != 0) { + return clonedValues[(n-1)/2]; + } else { + return ( clonedValues[(n-1)/2] + clonedValues[n/2] ) / 2; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/AmbariServerInterface.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/AmbariServerInterface.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/AmbariServerInterface.java new file mode 100644 index 0000000..920d758 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/AmbariServerInterface.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.prototype.core; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Base64; + +public class AmbariServerInterface implements Serializable{ + + private static final Log LOG = LogFactory.getLog(AmbariServerInterface.class); + + private String ambariServerHost; + private String clusterName; + + public AmbariServerInterface(String ambariServerHost, String clusterName) { + this.ambariServerHost = ambariServerHost; + this.clusterName = clusterName; + } + + public int getPointInTimeSensitivity() { + + String url = constructUri("http", ambariServerHost, "8080", "/api/v1/clusters/" + clusterName + "/alert_definitions?fields=*"); + + URL obj = null; + BufferedReader in = null; + + try { + obj = new URL(url); + HttpURLConnection con = (HttpURLConnection) obj.openConnection(); + con.setRequestMethod("GET"); + + String encoded = Base64.getEncoder().encodeToString(("admin:admin").getBytes(StandardCharsets.UTF_8)); + con.setRequestProperty("Authorization", "Basic "+encoded); + + int responseCode = con.getResponseCode(); + LOG.info("Sending 'GET' request to URL : " + url); + LOG.info("Response Code : " + responseCode); + + in = new BufferedReader( + new InputStreamReader(con.getInputStream())); + + StringBuilder responseJsonSb = new StringBuilder(); + String line; + while ((line = in.readLine()) != null) { + responseJsonSb.append(line); + } + + JSONObject jsonObject = new JSONObject(responseJsonSb.toString()); + JSONArray array = jsonObject.getJSONArray("items"); + for(int i = 0 ; i < array.length() ; i++){ + JSONObject alertDefn = array.getJSONObject(i).getJSONObject("AlertDefinition"); + if (alertDefn.get("name") != null && alertDefn.get("name").equals("point_in_time_metrics_anomalies")) { + JSONObject sourceNode = alertDefn.getJSONObject("source"); + JSONArray params = sourceNode.getJSONArray("parameters"); + for(int j = 0 ; j < params.length() ; j++){ + JSONObject param = params.getJSONObject(j); + if (param.get("name").equals("sensitivity")) { + return param.getInt("value"); + } + } + break; + } + } + + } catch (Exception e) { + LOG.error(e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + LOG.warn(e); + } + } + } + + return -1; + } + + private String constructUri(String protocol, String host, String port, String path) { + StringBuilder sb = new StringBuilder(protocol); + sb.append("://"); + sb.append(host); + sb.append(":"); + sb.append(port); + sb.append(path); + return sb.toString(); + } + +// public static void main(String[] args) { +// AmbariServerInterface ambariServerInterface = new AmbariServerInterface(); +// ambariServerInterface.getPointInTimeSensitivity("avijayan-ams-1.openstacklocal","c1"); +// } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricKafkaProducer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricKafkaProducer.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricKafkaProducer.java new file mode 100644 index 0000000..167fbb3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricKafkaProducer.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.core; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class MetricKafkaProducer { + + Producer producer; + private static String topicName = "ambari-metrics-topic"; + + public MetricKafkaProducer(String kafkaServers) { + Properties configProperties = new Properties(); + configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); //"avijayan-ams-2.openstacklocal:6667" + configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); + configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer"); + producer = new KafkaProducer(configProperties); + } + + public void sendMetrics(TimelineMetrics timelineMetrics) throws InterruptedException, ExecutionException { + + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode jsonNode = objectMapper.valueToTree(timelineMetrics); + ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(topicName,jsonNode); + Future<RecordMetadata> kafkaFuture = producer.send(rec); + + System.out.println(kafkaFuture.isDone()); + System.out.println(kafkaFuture.get().topic()); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java new file mode 100644 index 0000000..e8257e5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.core; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; +import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; +import scala.Tuple2; + +import java.util.*; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class MetricSparkConsumer { + + private static final Log LOG = LogFactory.getLog(MetricSparkConsumer.class); + private static String groupId = "ambari-metrics-group"; + private static String topicName = "ambari-metrics-topic"; + private static int numThreads = 1; + private static long pitStartTime = System.currentTimeMillis(); + private static long ksStartTime = pitStartTime; + private static long hdevStartTime = ksStartTime; + private static Set<Pattern> includeMetricPatterns = new HashSet<>(); + private static Set<String> includedHosts = new HashSet<>(); + private static Set<TrendMetric> trendMetrics = new HashSet<>(); + + public MetricSparkConsumer() { + } + + public static Properties readProperties(String propertiesFile) { + try { + Properties properties = new Properties(); + InputStream inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile); + if (inputStream == null) { + inputStream = new FileInputStream(propertiesFile); + } + properties.load(inputStream); + return properties; + } catch (IOException ioEx) { + LOG.error("Error reading properties file for jmeter"); + return null; + } + } + + public static void main(String[] args) throws InterruptedException { + + if (args.length < 1) { + System.err.println("Usage: MetricSparkConsumer <input-config-file>"); + System.exit(1); + } + + Properties properties = readProperties(args[0]); + + List<String> appIds = Arrays.asList(properties.getProperty("appIds").split(",")); + + String collectorHost = properties.getProperty("collectorHost"); + String collectorPort = properties.getProperty("collectorPort"); + String collectorProtocol = properties.getProperty("collectorProtocol"); + + String zkQuorum = properties.getProperty("zkQuorum"); + + double emaW = Double.parseDouble(properties.getProperty("emaW")); + double emaN = Double.parseDouble(properties.getProperty("emaN")); + int emaThreshold = Integer.parseInt(properties.getProperty("emaThreshold")); + double tukeysN = Double.parseDouble(properties.getProperty("tukeysN")); + + long pitTestInterval = Long.parseLong(properties.getProperty("pointInTimeTestInterval")); + long pitTrainInterval = Long.parseLong(properties.getProperty("pointInTimeTrainInterval")); + + long ksTestInterval = Long.parseLong(properties.getProperty("ksTestInterval")); + long ksTrainInterval = Long.parseLong(properties.getProperty("ksTrainInterval")); + int hsdevNhp = Integer.parseInt(properties.getProperty("hsdevNhp")); + long hsdevInterval = Long.parseLong(properties.getProperty("hsdevInterval")); + + String ambariServerHost = properties.getProperty("ambariServerHost"); + String clusterName = properties.getProperty("clusterName"); + + String includeMetricPatternStrings = properties.getProperty("includeMetricPatterns"); + if (includeMetricPatternStrings != null && !includeMetricPatternStrings.isEmpty()) { + String[] patterns = includeMetricPatternStrings.split(","); + for (String p : patterns) { + LOG.info("Included Pattern : " + p); + includeMetricPatterns.add(Pattern.compile(p)); + } + } + + String includedHostList = properties.getProperty("hosts"); + if (includedHostList != null && !includedHostList.isEmpty()) { + String[] hosts = includedHostList.split(","); + includedHosts.addAll(Arrays.asList(hosts)); + } + + MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort); + + SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector"); + + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); + + EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN, emaThreshold); + PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(metricsCollectorInterface, + tukeysN, + pitTestInterval, + pitTrainInterval, + ambariServerHost, + clusterName); + + TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface, + ksTestInterval, + ksTrainInterval, + hsdevNhp); + + Broadcast<EmaTechnique> emaTechniqueBroadcast = jssc.sparkContext().broadcast(emaTechnique); + Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = jssc.sparkContext().broadcast(pointInTimeADSystem); + Broadcast<TrendADSystem> trendADSystemBroadcast = jssc.sparkContext().broadcast(trendADSystem); + Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = jssc.sparkContext().broadcast(metricsCollectorInterface); + Broadcast<Set<Pattern>> includePatternBroadcast = jssc.sparkContext().broadcast(includeMetricPatterns); + Broadcast<Set<String>> includedHostBroadcast = jssc.sparkContext().broadcast(includedHosts); + + JavaPairReceiverInputDStream<String, String> messages = + KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads)); + + //Convert JSON string to TimelineMetrics. + JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() { + @Override + public TimelineMetrics call(Tuple2<String, String> message) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class); + return metrics; + } + }); + + timelineMetricsStream.print(); + + //Group TimelineMetric by AppId. + JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair( + timelineMetrics -> timelineMetrics.getMetrics().isEmpty() ? new Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), timelineMetrics) + ); + + appMetricStream.print(); + + //Filter AppIds that are not needed. + JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() { + @Override + public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception { + return appIds.contains(appMetricTuple._1); + } + }); + + filteredAppMetricStream.print(); + + filteredAppMetricStream.foreachRDD(rdd -> { + rdd.foreach( + tuple2 -> { + long currentTime = System.currentTimeMillis(); + EmaTechnique ema = emaTechniqueBroadcast.getValue(); + if (currentTime > pitStartTime + pitTestInterval) { + LOG.info("Running Tukeys...."); + pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, currentTime); + pitStartTime = pitStartTime + pitTestInterval; + } + + if (currentTime > ksStartTime + ksTestInterval) { + LOG.info("Running KS Test...."); + trendADSystemBroadcast.getValue().runKSTest(currentTime, trendMetrics); + ksStartTime = ksStartTime + ksTestInterval; + } + + if (currentTime > hdevStartTime + hsdevInterval) { + LOG.info("Running HSdev Test...."); + trendADSystemBroadcast.getValue().runHsdevMethod(); + hdevStartTime = hdevStartTime + hsdevInterval; + } + + TimelineMetrics metrics = tuple2._2(); + for (TimelineMetric timelineMetric : metrics.getMetrics()) { + + boolean includeHost = includedHostBroadcast.getValue().contains(timelineMetric.getHostName()); + boolean includeMetric = false; + if (includeHost) { + if (includePatternBroadcast.getValue().isEmpty()) { + includeMetric = true; + } + for (Pattern p : includePatternBroadcast.getValue()) { + Matcher m = p.matcher(timelineMetric.getMetricName()); + if (m.find()) { + includeMetric = true; + } + } + } + + if (includeMetric) { + trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), + timelineMetric.getHostName())); + List<MetricAnomaly> anomalies = ema.test(timelineMetric); + metricsCollectorInterfaceBroadcast.getValue().publish(anomalies); + } + } + }); + }); + + jssc.start(); + jssc.awaitTermination(); + } +} + + + + http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricsCollectorInterface.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricsCollectorInterface.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricsCollectorInterface.java new file mode 100644 index 0000000..da3999a --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricsCollectorInterface.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.core; + +import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.codehaus.jackson.map.AnnotationIntrospector; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectReader; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.TreeMap; + +public class MetricsCollectorInterface implements Serializable { + + private static String hostName = null; + private String instanceId = null; + public final static String serviceName = "anomaly-engine"; + private String collectorHost; + private String protocol; + private String port; + private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics"; + private static final Log LOG = LogFactory.getLog(MetricsCollectorInterface.class); + private static ObjectMapper mapper; + private final static ObjectReader timelineObjectReader; + + static { + mapper = new ObjectMapper(); + AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); + mapper.setAnnotationIntrospector(introspector); + mapper.getSerializationConfig() + .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); + timelineObjectReader = mapper.reader(TimelineMetrics.class); + } + + public MetricsCollectorInterface(String collectorHost, String protocol, String port) { + this.collectorHost = collectorHost; + this.protocol = protocol; + this.port = port; + this.hostName = getDefaultLocalHostName(); + } + + public static String getDefaultLocalHostName() { + + if (hostName != null) { + return hostName; + } + + try { + return InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + LOG.info("Error getting host address"); + } + return null; + } + + public void publish(List<MetricAnomaly> metricAnomalies) { + if (CollectionUtils.isNotEmpty(metricAnomalies)) { + LOG.info("Sending metric anomalies of size : " + metricAnomalies.size()); + List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies); + if (!metricList.isEmpty()) { + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.setMetrics(metricList); + emitMetrics(timelineMetrics); + } + } else { + LOG.debug("No anomalies to send."); + } + } + + private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) { + List<TimelineMetric> metrics = new ArrayList<>(); + + if (metricAnomalies.isEmpty()) { + return metrics; + } + + for (MetricAnomaly anomaly : metricAnomalies) { + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(anomaly.getMetricKey()); + timelineMetric.setAppId(serviceName + "-" + anomaly.getMethodType()); + timelineMetric.setInstanceId(null); + timelineMetric.setHostName(getDefaultLocalHostName()); + timelineMetric.setStartTime(anomaly.getTimestamp()); + HashMap<String, String> metadata = new HashMap<>(); + metadata.put("method", anomaly.getMethodType()); + metadata.put("anomaly-score", String.valueOf(anomaly.getAnomalyScore())); + timelineMetric.setMetadata(metadata); + TreeMap<Long,Double> metricValues = new TreeMap<>(); + metricValues.put(anomaly.getTimestamp(), anomaly.getMetricValue()); + timelineMetric.setMetricValues(metricValues); + + metrics.add(timelineMetric); + } + return metrics; + } + + public boolean emitMetrics(TimelineMetrics metrics) { + String connectUrl = constructTimelineMetricUri(); + String jsonData = null; + LOG.debug("EmitMetrics connectUrl = " + connectUrl); + try { + jsonData = mapper.writeValueAsString(metrics); + LOG.info(jsonData); + } catch (IOException e) { + LOG.error("Unable to parse metrics", e); + } + if (jsonData != null) { + return emitMetricsJson(connectUrl, jsonData); + } + return false; + } + + private HttpURLConnection getConnection(String spec) throws IOException { + return (HttpURLConnection) new URL(spec).openConnection(); + } + + private boolean emitMetricsJson(String connectUrl, String jsonData) { + int timeout = 10000; + HttpURLConnection connection = null; + try { + if (connectUrl == null) { + throw new IOException("Unknown URL. Unable to connect to metrics collector."); + } + connection = getConnection(connectUrl); + + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setRequestProperty("Connection", "Keep-Alive"); + connection.setConnectTimeout(timeout); + connection.setReadTimeout(timeout); + connection.setDoOutput(true); + + if (jsonData != null) { + try (OutputStream os = connection.getOutputStream()) { + os.write(jsonData.getBytes("UTF-8")); + } + } + + int statusCode = connection.getResponseCode(); + + if (statusCode != 200) { + LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " + + "statusCode = " + statusCode); + } else { + LOG.info("Metrics posted to Collector " + connectUrl); + } + return true; + } catch (IOException ioe) { + LOG.error(ioe.getMessage()); + } + return false; + } + + private String constructTimelineMetricUri() { + StringBuilder sb = new StringBuilder(protocol); + sb.append("://"); + sb.append(collectorHost); + sb.append(":"); + sb.append(port); + sb.append(WS_V1_TIMELINE_METRICS); + return sb.toString(); + } + + public TimelineMetrics fetchMetrics(String metricName, + String appId, + String hostname, + long startime, + long endtime) { + + String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId + + "&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime; + LOG.debug("Fetch metrics URL : " + url); + + URL obj = null; + BufferedReader in = null; + TimelineMetrics timelineMetrics = new TimelineMetrics(); + + try { + obj = new URL(url); + HttpURLConnection con = (HttpURLConnection) obj.openConnection(); + con.setRequestMethod("GET"); + int responseCode = con.getResponseCode(); + LOG.debug("Sending 'GET' request to URL : " + url); + LOG.debug("Response Code : " + responseCode); + + in = new BufferedReader( + new InputStreamReader(con.getInputStream())); + timelineMetrics = timelineObjectReader.readValue(in); + } catch (Exception e) { + LOG.error(e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + LOG.warn(e); + } + } + } + + LOG.info("Fetched " + timelineMetrics.getMetrics().size() + " metrics."); + return timelineMetrics; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java new file mode 100644 index 0000000..0a2271a --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.core; + +import org.apache.ambari.metrics.adservice.prototype.common.ResultSet; +import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaModel; +import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +public class PointInTimeADSystem implements Serializable { + + //private EmaTechnique emaTechnique; + private MetricsCollectorInterface metricsCollectorInterface; + private Map<String, Double> tukeysNMap; + private double defaultTukeysN = 3; + + private long testIntervalMillis = 5*60*1000; //10mins + private long trainIntervalMillis = 15*60*1000; //1hour + + private static final Log LOG = LogFactory.getLog(PointInTimeADSystem.class); + + private AmbariServerInterface ambariServerInterface; + private int sensitivity = 50; + private int minSensitivity = 0; + private int maxSensitivity = 100; + + public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN, + long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) { + this.metricsCollectorInterface = metricsCollectorInterface; + this.defaultTukeysN = defaultTukeysN; + this.tukeysNMap = new HashMap<>(); + this.testIntervalMillis = testIntervalMillis; + this.trainIntervalMillis = trainIntervalMillis; + this.ambariServerInterface = new AmbariServerInterface(ambariServerHost, clusterName); + LOG.info("Starting PointInTimeADSystem..."); + } + + public void runTukeysAndRefineEma(EmaTechnique emaTechnique, long startTime) { + LOG.info("Running Tukeys for test data interval [" + new Date(startTime - testIntervalMillis) + " : " + new Date(startTime) + "], with train data period [" + new Date(startTime - testIntervalMillis - trainIntervalMillis) + " : " + new Date(startTime - testIntervalMillis) + "]"); + + int requiredSensivity = ambariServerInterface.getPointInTimeSensitivity(); + if (requiredSensivity == -1 || requiredSensivity == sensitivity) { + LOG.info("No change in sensitivity needed."); + } else { + LOG.info("Current tukey's N value = " + defaultTukeysN); + if (requiredSensivity > sensitivity) { + int targetSensitivity = Math.min(maxSensitivity, requiredSensivity); + while (sensitivity < targetSensitivity) { + defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.05; + sensitivity++; + } + } else { + int targetSensitivity = Math.max(minSensitivity, requiredSensivity); + while (sensitivity > targetSensitivity) { + defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.05; + sensitivity--; + } + } + LOG.info("New tukey's N value = " + defaultTukeysN); + } + + TimelineMetrics timelineMetrics = new TimelineMetrics(); + for (String metricKey : emaTechnique.getTrackedEmas().keySet()) { + LOG.info("EMA key = " + metricKey); + EmaModel emaModel = emaTechnique.getTrackedEmas().get(metricKey); + String metricName = emaModel.getMetricName(); + String appId = emaModel.getAppId(); + String hostname = emaModel.getHostname(); + + TimelineMetrics tukeysData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, startTime - (testIntervalMillis + trainIntervalMillis), + startTime); + + if (tukeysData.getMetrics().isEmpty()) { + LOG.info("No metrics fetched for Tukeys, metricKey = " + metricKey); + continue; + } + + List<Double> trainTsList = new ArrayList<>(); + List<Double> trainDataList = new ArrayList<>(); + List<Double> testTsList = new ArrayList<>(); + List<Double> testDataList = new ArrayList<>(); + + for (TimelineMetric metric : tukeysData.getMetrics()) { + for (Long timestamp : metric.getMetricValues().keySet()) { + if (timestamp <= (startTime - testIntervalMillis)) { + trainDataList.add(metric.getMetricValues().get(timestamp)); + trainTsList.add((double)timestamp); + } else { + testDataList.add(metric.getMetricValues().get(timestamp)); + testTsList.add((double)timestamp); + } + } + } + + if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { + LOG.info("Not enough train/test data to perform analysis."); + continue; + } + + String tukeysTrainSeries = "tukeysTrainSeries"; + double[] trainTs = new double[trainTsList.size()]; + double[] trainData = new double[trainTsList.size()]; + for (int i = 0; i < trainTs.length; i++) { + trainTs[i] = trainTsList.get(i); + trainData[i] = trainDataList.get(i); + } + + String tukeysTestSeries = "tukeysTestSeries"; + double[] testTs = new double[testTsList.size()]; + double[] testData = new double[testTsList.size()]; + for (int i = 0; i < testTs.length; i++) { + testTs[i] = testTsList.get(i); + testData[i] = testDataList.get(i); + } + + LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length); + + DataSeries tukeysTrainData = new DataSeries(tukeysTrainSeries, trainTs, trainData); + DataSeries tukeysTestData = new DataSeries(tukeysTestSeries, testTs, testData); + + if (!tukeysNMap.containsKey(metricKey)) { + tukeysNMap.put(metricKey, defaultTukeysN); + } + + Map<String, String> configs = new HashMap<>(); + configs.put("tukeys.n", String.valueOf(tukeysNMap.get(metricKey))); + + ResultSet rs = RFunctionInvoker.tukeys(tukeysTrainData, tukeysTestData, configs); + + List<TimelineMetric> tukeysMetrics = getAsTimelineMetric(rs, metricName, appId, hostname); + LOG.info("Tukeys anomalies size : " + tukeysMetrics.size()); + TreeMap<Long, Double> tukeysMetricValues = new TreeMap<>(); + + for (TimelineMetric tukeysMetric : tukeysMetrics) { + tukeysMetricValues.putAll(tukeysMetric.getMetricValues()); + timelineMetrics.addOrMergeTimelineMetric(tukeysMetric); + } + + TimelineMetrics emaData = metricsCollectorInterface.fetchMetrics(metricKey, MetricsCollectorInterface.serviceName+"-ema", MetricsCollectorInterface.getDefaultLocalHostName(), startTime - testIntervalMillis, startTime); + TreeMap<Long, Double> emaMetricValues = new TreeMap(); + if (!emaData.getMetrics().isEmpty()) { + emaMetricValues = emaData.getMetrics().get(0).getMetricValues(); + } + + LOG.info("Ema anomalies size : " + emaMetricValues.size()); + int tp = 0; + int tn = 0; + int fp = 0; + int fn = 0; + + for (double ts : testTs) { + long timestamp = (long) ts; + if (tukeysMetricValues.containsKey(timestamp)) { + if (emaMetricValues.containsKey(timestamp)) { + tp++; + } else { + fn++; + } + } else { + if (emaMetricValues.containsKey(timestamp)) { + fp++; + } else { + tn++; + } + } + } + + double recall = (double) tp / (double) (tp + fn); + double precision = (double) tp / (double) (tp + fp); + LOG.info("----------------------------"); + LOG.info("Precision Recall values for " + metricKey); + LOG.info("tp=" + tp + ", fp=" + fp + ", tn=" + tn + ", fn=" + fn); + LOG.info("----------------------------"); + + if (recall < 0.5) { + LOG.info("Increasing EMA sensitivity by 10%"); + emaModel.updateModel(true, 5); + } else if (precision < 0.5) { + LOG.info("Decreasing EMA sensitivity by 10%"); + emaModel.updateModel(false, 5); + } + + } + + if (emaTechnique.getTrackedEmas().isEmpty()){ + LOG.info("No EMA Technique keys tracked!!!!"); + } + + if (!timelineMetrics.getMetrics().isEmpty()) { + metricsCollectorInterface.emitMetrics(timelineMetrics); + } + } + + private static List<TimelineMetric> getAsTimelineMetric(ResultSet result, String metricName, String appId, String hostname) { + + List<TimelineMetric> timelineMetrics = new ArrayList<>(); + + if (result == null) { + LOG.info("ResultSet from R call is null!!"); + return null; + } + + if (result.resultset.size() > 0) { + double[] ts = result.resultset.get(0); + double[] metrics = result.resultset.get(1); + double[] anomalyScore = result.resultset.get(2); + for (int i = 0; i < ts.length; i++) { + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(metricName + ":" + appId + ":" + hostname); + timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName()); + timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys"); + timelineMetric.setInstanceId(null); + timelineMetric.setStartTime((long) ts[i]); + TreeMap<Long, Double> metricValues = new TreeMap<>(); + metricValues.put((long) ts[i], metrics[i]); + + HashMap<String, String> metadata = new HashMap<>(); + metadata.put("method", "tukeys"); + if (String.valueOf(anomalyScore[i]).equals("infinity")) { + LOG.info("Got anomalyScore = infinity for " + metricName + ":" + appId + ":" + hostname); + } else { + metadata.put("anomaly-score", String.valueOf(anomalyScore[i])); + } + timelineMetric.setMetadata(metadata); + + timelineMetric.setMetricValues(metricValues); + timelineMetrics.add(timelineMetric); + } + } + + return timelineMetrics; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java new file mode 100644 index 0000000..8f1eba6 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.core; + + +import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.adservice.prototype.common.ResultSet; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.rosuda.JRI.REXP; +import org.rosuda.JRI.RVector; +import org.rosuda.JRI.Rengine; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class RFunctionInvoker { + + static final Log LOG = LogFactory.getLog(RFunctionInvoker.class); + public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null); + private static String rScriptDir = "/usr/lib/ambari-metrics-collector/R-scripts"; + + private static void loadDataSets(Rengine r, DataSeries trainData, DataSeries testData) { + r.assign("train_ts", trainData.ts); + r.assign("train_x", trainData.values); + r.eval("train_data <- data.frame(train_ts,train_x)"); + r.eval("names(train_data) <- c(\"TS\", " + trainData.seriesName + ")"); + + r.assign("test_ts", testData.ts); + r.assign("test_x", testData.values); + r.eval("test_data <- data.frame(test_ts,test_x)"); + r.eval("names(test_data) <- c(\"TS\", " + testData.seriesName + ")"); + } + + public static void setScriptsDir(String dir) { + rScriptDir = dir; + } + + public static ResultSet executeMethod(String methodType, DataSeries trainData, DataSeries testData, Map<String, String> configs) { + + ResultSet result; + switch (methodType) { + case "tukeys": + result = tukeys(trainData, testData, configs); + break; + case "ema": + result = ema_global(trainData, testData, configs); + break; + case "ks": + result = ksTest(trainData, testData, configs); + break; + case "hsdev": + result = hsdev(trainData, testData, configs); + break; + default: + result = tukeys(trainData, testData, configs); + break; + } + return result; + } + + public static ResultSet tukeys(DataSeries trainData, DataSeries testData, Map<String, String> configs) { + try { + + REXP exp1 = r.eval("source('" + rScriptDir + "/tukeys.r" + "')"); + + double n = Double.parseDouble(configs.get("tukeys.n")); + r.eval("n <- " + n); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ams_tukeys(train_data, test_data, n)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i < cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + } catch (Exception e) { + LOG.error(e); + } finally { + r.end(); + } + return null; + } + + public static ResultSet ema_global(DataSeries trainData, DataSeries testData, Map<String, String> configs) { + try { + r.eval("source('" + rScriptDir + "/ema.r" + "')"); + + int n = Integer.parseInt(configs.get("ema.n")); + r.eval("n <- " + n); + + double w = Double.parseDouble(configs.get("ema.w")); + r.eval("w <- " + w); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ema_global(train_data, test_data, w, n)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i < cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + + } catch (Exception e) { + LOG.error(e); + } finally { + r.end(); + } + return null; + } + + public static ResultSet ema_daily(DataSeries trainData, DataSeries testData, Map<String, String> configs) { + try { + r.eval("source('" + rScriptDir + "/ema.r" + "')"); + + int n = Integer.parseInt(configs.get("ema.n")); + r.eval("n <- " + n); + + double w = Double.parseDouble(configs.get("ema.w")); + r.eval("w <- " + w); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ema_daily(train_data, test_data, w, n)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i < cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + + } catch (Exception e) { + LOG.error(e); + } finally { + r.end(); + } + return null; + } + + public static ResultSet ksTest(DataSeries trainData, DataSeries testData, Map<String, String> configs) { + try { + r.eval("source('" + rScriptDir + "/kstest.r" + "')"); + + double p_value = Double.parseDouble(configs.get("ks.p_value")); + r.eval("p_value <- " + p_value); + + loadDataSets(r, trainData, testData); + + r.eval("an <- ams_ks(train_data, test_data, p_value)"); + REXP exp = r.eval("an"); + RVector cont = (RVector) exp.getContent(); + List<double[]> result = new ArrayList(); + for (int i = 0; i < cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + + } catch (Exception e) { + LOG.error(e); + } finally { + r.end(); + } + return null; + } + + public static ResultSet hsdev(DataSeries trainData, DataSeries testData, Map<String, String> configs) { + try { + r.eval("source('" + rScriptDir + "/hsdev.r" + "')"); + + int n = Integer.parseInt(configs.get("hsdev.n")); + r.eval("n <- " + n); + + int nhp = Integer.parseInt(configs.get("hsdev.nhp")); + r.eval("nhp <- " + nhp); + + long interval = Long.parseLong(configs.get("hsdev.interval")); + r.eval("interval <- " + interval); + + long period = Long.parseLong(configs.get("hsdev.period")); + r.eval("period <- " + period); + + loadDataSets(r, trainData, testData); + + r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)"); + REXP exp = r.eval("an2"); + RVector cont = (RVector) exp.getContent(); + + List<double[]> result = new ArrayList(); + for (int i = 0; i < cont.size(); i++) { + result.add(cont.at(i).asDoubleArray()); + } + return new ResultSet(result); + } catch (Exception e) { + LOG.error(e); + } finally { + r.end(); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java new file mode 100644 index 0000000..f5ec83a --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.core; + +import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; +import org.apache.ambari.metrics.adservice.prototype.methods.hsdev.HsdevTechnique; +import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.adservice.prototype.methods.kstest.KSTechnique; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +public class TrendADSystem implements Serializable { + + private MetricsCollectorInterface metricsCollectorInterface; + private List<TrendMetric> trendMetrics; + + private long ksTestIntervalMillis = 10 * 60 * 1000; + private long ksTrainIntervalMillis = 10 * 60 * 1000; + private KSTechnique ksTechnique; + + private HsdevTechnique hsdevTechnique; + private int hsdevNumHistoricalPeriods = 3; + + private Map<KsSingleRunKey, MetricAnomaly> trackedKsAnomalies = new HashMap<>(); + private static final Log LOG = LogFactory.getLog(TrendADSystem.class); + private String inputFile = ""; + + public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface, + long ksTestIntervalMillis, + long ksTrainIntervalMillis, + int hsdevNumHistoricalPeriods) { + + this.metricsCollectorInterface = metricsCollectorInterface; + this.ksTestIntervalMillis = ksTestIntervalMillis; + this.ksTrainIntervalMillis = ksTrainIntervalMillis; + this.hsdevNumHistoricalPeriods = hsdevNumHistoricalPeriods; + + this.ksTechnique = new KSTechnique(); + this.hsdevTechnique = new HsdevTechnique(); + + trendMetrics = new ArrayList<>(); + } + + public void runKSTest(long currentEndTime, Set<TrendMetric> trendMetrics) { + readInputFile(inputFile); + + long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis; + LOG.info("Running KS Test for test data interval [" + new Date(ksTestIntervalStartTime) + " : " + + new Date(currentEndTime) + "], with train data period [" + new Date(ksTestIntervalStartTime - ksTrainIntervalMillis) + + " : " + new Date(ksTestIntervalStartTime) + "]"); + + for (TrendMetric metric : trendMetrics) { + String metricName = metric.metricName; + String appId = metric.appId; + String hostname = metric.hostname; + String key = metricName + ":" + appId + ":" + hostname; + + TimelineMetrics ksData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, ksTestIntervalStartTime - ksTrainIntervalMillis, + currentEndTime); + + if (ksData.getMetrics().isEmpty()) { + LOG.info("No metrics fetched for KS, metricKey = " + key); + continue; + } + + List<Double> trainTsList = new ArrayList<>(); + List<Double> trainDataList = new ArrayList<>(); + List<Double> testTsList = new ArrayList<>(); + List<Double> testDataList = new ArrayList<>(); + + for (TimelineMetric timelineMetric : ksData.getMetrics()) { + for (Long timestamp : timelineMetric.getMetricValues().keySet()) { + if (timestamp <= ksTestIntervalStartTime) { + trainDataList.add(timelineMetric.getMetricValues().get(timestamp)); + trainTsList.add((double) timestamp); + } else { + testDataList.add(timelineMetric.getMetricValues().get(timestamp)); + testTsList.add((double) timestamp); + } + } + } + + LOG.info("Train Data size : " + trainDataList.size() + ", Test Data Size : " + testDataList.size()); + if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { + LOG.info("Not enough train/test data to perform KS analysis."); + continue; + } + + String ksTrainSeries = "KSTrainSeries"; + double[] trainTs = new double[trainTsList.size()]; + double[] trainData = new double[trainTsList.size()]; + for (int i = 0; i < trainTs.length; i++) { + trainTs[i] = trainTsList.get(i); + trainData[i] = trainDataList.get(i); + } + + String ksTestSeries = "KSTestSeries"; + double[] testTs = new double[testTsList.size()]; + double[] testData = new double[testTsList.size()]; + for (int i = 0; i < testTs.length; i++) { + testTs[i] = testTsList.get(i); + testData[i] = testDataList.get(i); + } + + LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length); + + DataSeries ksTrainData = new DataSeries(ksTrainSeries, trainTs, trainData); + DataSeries ksTestData = new DataSeries(ksTestSeries, testTs, testData); + + MetricAnomaly metricAnomaly = ksTechnique.runKsTest(key, ksTrainData, ksTestData); + if (metricAnomaly == null) { + LOG.info("No anomaly from KS test."); + } else { + LOG.info("Found Anomaly in KS Test. Publishing KS Anomaly metric...."); + TimelineMetric timelineMetric = getAsTimelineMetric(metricAnomaly, + ksTestIntervalStartTime, currentEndTime, ksTestIntervalStartTime - ksTrainIntervalMillis, ksTestIntervalStartTime); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.addOrMergeTimelineMetric(timelineMetric); + metricsCollectorInterface.emitMetrics(timelineMetrics); + + trackedKsAnomalies.put(new KsSingleRunKey(ksTestIntervalStartTime, currentEndTime, metricName, appId, hostname), metricAnomaly); + } + } + + if (trendMetrics.isEmpty()) { + LOG.info("No Trend metrics tracked!!!!"); + } + + } + + private TimelineMetric getAsTimelineMetric(MetricAnomaly metricAnomaly, + long testStart, + long testEnd, + long trainStart, + long trainEnd) { + + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(metricAnomaly.getMetricKey()); + timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-" + metricAnomaly.getMethodType()); + timelineMetric.setInstanceId(null); + timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName()); + timelineMetric.setStartTime(testEnd); + HashMap<String, String> metadata = new HashMap<>(); + metadata.put("method", metricAnomaly.getMethodType()); + metadata.put("anomaly-score", String.valueOf(metricAnomaly.getAnomalyScore())); + metadata.put("test-start-time", String.valueOf(testStart)); + metadata.put("train-start-time", String.valueOf(trainStart)); + metadata.put("train-end-time", String.valueOf(trainEnd)); + timelineMetric.setMetadata(metadata); + TreeMap<Long,Double> metricValues = new TreeMap<>(); + metricValues.put(testEnd, metricAnomaly.getMetricValue()); + timelineMetric.setMetricValues(metricValues); + return timelineMetric; + + } + + public void runHsdevMethod() { + + List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>(); + + for (KsSingleRunKey ksSingleRunKey : trackedKsAnomalies.keySet()) { + + long hsdevTestEnd = ksSingleRunKey.endTime; + long hsdevTestStart = ksSingleRunKey.startTime; + + long period = hsdevTestEnd - hsdevTestStart; + + long hsdevTrainStart = hsdevTestStart - (hsdevNumHistoricalPeriods) * period; + long hsdevTrainEnd = hsdevTestStart; + + LOG.info("Running HSdev Test for test data interval [" + new Date(hsdevTestStart) + " : " + + new Date(hsdevTestEnd) + "], with train data period [" + new Date(hsdevTrainStart) + + " : " + new Date(hsdevTrainEnd) + "]"); + + String metricName = ksSingleRunKey.metricName; + String appId = ksSingleRunKey.appId; + String hostname = ksSingleRunKey.hostname; + String key = metricName + "_" + appId + "_" + hostname; + + TimelineMetrics hsdevData = metricsCollectorInterface.fetchMetrics( + metricName, + appId, + hostname, + hsdevTrainStart, + hsdevTestEnd); + + if (hsdevData.getMetrics().isEmpty()) { + LOG.info("No metrics fetched for HSDev, metricKey = " + key); + continue; + } + + List<Double> trainTsList = new ArrayList<>(); + List<Double> trainDataList = new ArrayList<>(); + List<Double> testTsList = new ArrayList<>(); + List<Double> testDataList = new ArrayList<>(); + + for (TimelineMetric timelineMetric : hsdevData.getMetrics()) { + for (Long timestamp : timelineMetric.getMetricValues().keySet()) { + if (timestamp <= hsdevTestStart) { + trainDataList.add(timelineMetric.getMetricValues().get(timestamp)); + trainTsList.add((double) timestamp); + } else { + testDataList.add(timelineMetric.getMetricValues().get(timestamp)); + testTsList.add((double) timestamp); + } + } + } + + if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { + LOG.info("Not enough train/test data to perform Hsdev analysis."); + continue; + } + + String hsdevTrainSeries = "HsdevTrainSeries"; + double[] trainTs = new double[trainTsList.size()]; + double[] trainData = new double[trainTsList.size()]; + for (int i = 0; i < trainTs.length; i++) { + trainTs[i] = trainTsList.get(i); + trainData[i] = trainDataList.get(i); + } + + String hsdevTestSeries = "HsdevTestSeries"; + double[] testTs = new double[testTsList.size()]; + double[] testData = new double[testTsList.size()]; + for (int i = 0; i < testTs.length; i++) { + testTs[i] = testTsList.get(i); + testData[i] = testDataList.get(i); + } + + LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length); + + DataSeries hsdevTrainData = new DataSeries(hsdevTrainSeries, trainTs, trainData); + DataSeries hsdevTestData = new DataSeries(hsdevTestSeries, testTs, testData); + + MetricAnomaly metricAnomaly = hsdevTechnique.runHsdevTest(key, hsdevTrainData, hsdevTestData); + if (metricAnomaly == null) { + LOG.info("No anomaly from Hsdev test. Mismatch between KS and HSDev. "); + ksTechnique.updateModel(key, false, 10); + } else { + LOG.info("Found Anomaly in Hsdev Test. This confirms KS anomaly."); + hsdevMetricAnomalies.add(getAsTimelineMetric(metricAnomaly, + hsdevTestStart, hsdevTestEnd, hsdevTrainStart, hsdevTrainEnd)); + } + } + clearTrackedKsRunKeys(); + + if (!hsdevMetricAnomalies.isEmpty()) { + LOG.info("Publishing Hsdev Anomalies...."); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.setMetrics(hsdevMetricAnomalies); + metricsCollectorInterface.emitMetrics(timelineMetrics); + } + } + + private void clearTrackedKsRunKeys() { + trackedKsAnomalies.clear(); + } + + private void readInputFile(String fileName) { + trendMetrics.clear(); + try (BufferedReader br = new BufferedReader(new FileReader(fileName))) { + for (String line; (line = br.readLine()) != null; ) { + String[] splits = line.split(","); + LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]); + trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2])); + } + } catch (IOException e) { + LOG.error("Error reading input file : " + e); + } + } + + class KsSingleRunKey implements Serializable{ + + long startTime; + long endTime; + String metricName; + String appId; + String hostname; + + public KsSingleRunKey(long startTime, long endTime, String metricName, String appId, String hostname) { + this.startTime = startTime; + this.endTime = endTime; + this.metricName = metricName; + this.appId = appId; + this.hostname = hostname; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java new file mode 100644 index 0000000..d4db227 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.core; + +import java.io.Serializable; + +public class TrendMetric implements Serializable { + + String metricName; + String appId; + String hostname; + + public TrendMetric(String metricName, String appId, String hostname) { + this.metricName = metricName; + this.appId = appId; + this.hostname = hostname; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java new file mode 100644 index 0000000..c19adda --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.methods; + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; + +import java.util.List; + +public abstract class AnomalyDetectionTechnique { + + protected String methodType; + + public abstract List<MetricAnomaly> test(TimelineMetric metric); + +}
