Repository: ambari Updated Branches: refs/heads/branch-3.0-ams 97dfe6b73 -> e33b54557
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java new file mode 100644 index 0000000..d98ef0c --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype; + +import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet; +import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.alertservice.prototype.core.RFunctionInvoker; +import org.apache.ambari.metrics.alertservice.seriesgenerator.UniformMetricSeries; +import org.apache.commons.lang.ArrayUtils; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +public class TestRFunctionInvoker { + + private static String metricName = "TestMetric"; + private static double[] ts; + private static String fullFilePath; + + @BeforeClass + public static void init() throws URISyntaxException { + + Assume.assumeTrue(System.getenv("R_HOME") != null); + ts = getTS(1000); + URL url = ClassLoader.getSystemResource("R-scripts"); + fullFilePath = new File(url.toURI()).getAbsolutePath(); + RFunctionInvoker.setScriptsDir(fullFilePath); + } + + @Test + public void testTukeys() throws URISyntaxException { + + double[] train_ts = ArrayUtils.subarray(ts, 0, 750); + double[] train_x = getRandomData(750); + DataSeries trainData = new DataSeries(metricName, train_ts, train_x); + + double[] test_ts = ArrayUtils.subarray(ts, 750, 1000); + double[] test_x = getRandomData(250); + test_x[50] = 5.5; //Anomaly + DataSeries testData = new DataSeries(metricName, test_ts, test_x); + Map<String, String> configs = new HashMap(); + configs.put("tukeys.n", "3"); + + ResultSet rs = RFunctionInvoker.tukeys(trainData, testData, configs); + Assert.assertEquals(rs.resultset.size(), 2); + Assert.assertEquals(rs.resultset.get(1)[0], 5.5, 0.1); + + } + + public static void main(String[] args) throws URISyntaxException { + + String metricName = "TestMetric"; + double[] ts = getTS(1000); + URL url = ClassLoader.getSystemResource("R-scripts"); + String fullFilePath = new File(url.toURI()).getAbsolutePath(); + RFunctionInvoker.setScriptsDir(fullFilePath); + + double[] train_ts = ArrayUtils.subarray(ts, 0, 750); + double[] train_x = getRandomData(750); + DataSeries trainData = new DataSeries(metricName, train_ts, train_x); + + double[] test_ts = ArrayUtils.subarray(ts, 750, 1000); + double[] test_x = getRandomData(250); + test_x[50] = 5.5; //Anomaly + DataSeries testData = new DataSeries(metricName, test_ts, test_x); + ResultSet rs; + + Map<String, String> configs = new HashMap(); + + System.out.println("TUKEYS"); + configs.put("tukeys.n", "3"); + rs = RFunctionInvoker.tukeys(trainData, testData, configs); + rs.print(); + System.out.println("--------------"); + +// System.out.println("EMA Global"); +// configs.put("ema.n", "3"); +// configs.put("ema.w", "0.8"); +// rs = RFunctionInvoker.ema_global(trainData, testData, configs); +// rs.print(); +// System.out.println("--------------"); +// +// System.out.println("EMA Daily"); +// rs = RFunctionInvoker.ema_daily(trainData, testData, configs); +// rs.print(); +// System.out.println("--------------"); +// +// configs.put("ks.p_value", "0.00005"); +// System.out.println("KS Test"); +// rs = RFunctionInvoker.ksTest(trainData, testData, configs); +// rs.print(); +// System.out.println("--------------"); +// + ts = getTS(5000); + train_ts = ArrayUtils.subarray(ts, 0, 4800); + train_x = getRandomData(4800); + trainData = new DataSeries(metricName, train_ts, train_x); + test_ts = ArrayUtils.subarray(ts, 4800, 5000); + test_x = getRandomData(200); + for (int i = 0; i < 200; i++) { + test_x[i] = test_x[i] * 5; + } + testData = new DataSeries(metricName, test_ts, test_x); + configs.put("hsdev.n", "3"); + configs.put("hsdev.nhp", "3"); + configs.put("hsdev.interval", "86400000"); + configs.put("hsdev.period", "604800000"); + System.out.println("HSdev"); + rs = RFunctionInvoker.hsdev(trainData, testData, configs); + rs.print(); + System.out.println("--------------"); + + } + + static double[] getTS(int n) { + long currentTime = System.currentTimeMillis(); + double[] ts = new double[n]; + currentTime = currentTime - (currentTime % (5 * 60 * 1000)); + + for (int i = 0, j = n - 1; i < n; i++, j--) { + ts[j] = currentTime; + currentTime = currentTime - (5 * 60 * 1000); + } + return ts; + } + + static double[] getRandomData(int n) { + + UniformMetricSeries metricSeries = new UniformMetricSeries(10, 0.1,0.05, 0.6, 0.8, true); + return metricSeries.getSeries(n); + +// double[] metrics = new double[n]; +// Random random = new Random(); +// for (int i = 0; i < n; i++) { +// metrics[i] = random.nextDouble(); +// } +// return metrics; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java new file mode 100644 index 0000000..86590bd --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype; + +import org.apache.ambari.metrics.alertservice.prototype.core.MetricsCollectorInterface; +import org.apache.ambari.metrics.alertservice.prototype.core.RFunctionInvoker; +import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; +import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.List; +import java.util.TreeMap; + +public class TestTukeys { + + @BeforeClass + public static void init() throws URISyntaxException { + Assume.assumeTrue(System.getenv("R_HOME") != null); + } + + @Test + public void testPointInTimeDetectionSystem() throws UnknownHostException, URISyntaxException { + + URL url = ClassLoader.getSystemResource("R-scripts"); + String fullFilePath = new File(url.toURI()).getAbsolutePath(); + RFunctionInvoker.setScriptsDir(fullFilePath); + + MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface("avijayan-ams-1.openstacklocal","http", "6188"); + + EmaTechnique ema = new EmaTechnique(0.5, 3); + long now = System.currentTimeMillis(); + + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("mm9"); + metric1.setHostName(MetricsCollectorInterface.getDefaultLocalHostName()); + metric1.setStartTime(now); + metric1.setAppId("aa9"); + metric1.setInstanceId(null); + metric1.setType("Integer"); + + //Train + TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + + //2hr data. + for (int i = 0; i < 120; i++) { + double metric = 20000 + Math.random(); + metricValues.put(now - i * 60 * 1000, metric); + } + metric1.setMetricValues(metricValues); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.addOrMergeTimelineMetric(metric1); + + metricsCollectorInterface.emitMetrics(timelineMetrics); + + List<MetricAnomaly> anomalyList = ema.test(metric1); + metricsCollectorInterface.publish(anomalyList); +// +// PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(ema, metricsCollectorInterface, 3, 5*60*1000, 15*60*1000); +// pointInTimeADSystem.runOnce(); +// +// List<MetricAnomaly> anomalyList2 = ema.test(metric1); +// +// pointInTimeADSystem.runOnce(); +// List<MetricAnomaly> anomalyList3 = ema.test(metric1); +// +// pointInTimeADSystem.runOnce(); +// List<MetricAnomaly> anomalyList4 = ema.test(metric1); +// +// pointInTimeADSystem.runOnce(); +// List<MetricAnomaly> anomalyList5 = ema.test(metric1); +// +// pointInTimeADSystem.runOnce(); +// List<MetricAnomaly> anomalyList6 = ema.test(metric1); +// +// Assert.assertTrue(anomalyList6.size() < anomalyList.size()); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java new file mode 100644 index 0000000..fe7dba9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +import org.junit.Assert; +import org.junit.Test; + +public class MetricSeriesGeneratorTest { + + @Test + public void testUniformSeries() { + + UniformMetricSeries metricSeries = new UniformMetricSeries(5, 0.2, 0, 0, 0, true); + Assert.assertTrue(metricSeries.nextValue() <= 6 && metricSeries.nextValue() >= 4); + + double[] uniformSeries = MetricSeriesGeneratorFactory.createUniformSeries(50, 10, 0.2, 0.1, 0.4, 0.5, true); + Assert.assertTrue(uniformSeries.length == 50); + + for (int i = 0; i < uniformSeries.length; i++) { + double value = uniformSeries[i]; + + if (value > 10 * 1.2) { + Assert.assertTrue(value >= 10 * 1.4 && value <= 10 * 1.6); + } else { + Assert.assertTrue(value >= 10 * 0.8 && value <= 10 * 1.2); + } + } + } + + @Test + public void testNormalSeries() { + NormalMetricSeries metricSeries = new NormalMetricSeries(0, 1, 0, 0, 0, true); + Assert.assertTrue(metricSeries.nextValue() <= 3 && metricSeries.nextValue() >= -3); + } + + @Test + public void testMonotonicSeries() { + + MonotonicMetricSeries metricSeries = new MonotonicMetricSeries(0, 0.5, 0, 0, 0, 0, true); + Assert.assertTrue(metricSeries.nextValue() == 0); + Assert.assertTrue(metricSeries.nextValue() == 0.5); + + double[] incSeries = MetricSeriesGeneratorFactory.createMonotonicSeries(20, 0, 0.5, 0, 0, 0, 0, true); + Assert.assertTrue(incSeries.length == 20); + for (int i = 0; i < incSeries.length; i++) { + Assert.assertTrue(incSeries[i] == i * 0.5); + } + } + + @Test + public void testDualBandSeries() { + double[] dualBandSeries = MetricSeriesGeneratorFactory.getDualBandSeries(30, 5, 0.2, 5, 15, 0.3, 4); + Assert.assertTrue(dualBandSeries[0] >= 4 && dualBandSeries[0] <= 6); + Assert.assertTrue(dualBandSeries[4] >= 4 && dualBandSeries[4] <= 6); + Assert.assertTrue(dualBandSeries[5] >= 10.5 && dualBandSeries[5] <= 19.5); + Assert.assertTrue(dualBandSeries[8] >= 10.5 && dualBandSeries[8] <= 19.5); + Assert.assertTrue(dualBandSeries[9] >= 4 && dualBandSeries[9] <= 6); + } + + @Test + public void testStepSeries() { + double[] stepSeries = MetricSeriesGeneratorFactory.getStepFunctionSeries(30, 10, 0, 0, 5, 5, 0.5, true); + + Assert.assertTrue(stepSeries[0] == 10); + Assert.assertTrue(stepSeries[4] == 10); + + Assert.assertTrue(stepSeries[5] == 10*1.5); + Assert.assertTrue(stepSeries[9] == 10*1.5); + + Assert.assertTrue(stepSeries[10] == 10*1.5*1.5); + Assert.assertTrue(stepSeries[14] == 10*1.5*1.5); + } + + @Test + public void testSteadySeriesWithTurbulence() { + double[] steadySeriesWithTurbulence = MetricSeriesGeneratorFactory.getSteadySeriesWithTurbulentPeriod(30, 5, 0, 1, 1, 5, 1); + + int count = 0; + for (int i = 0; i < steadySeriesWithTurbulence.length; i++) { + if (steadySeriesWithTurbulence[i] == 10) { + count++; + } + } + Assert.assertTrue(count == 5); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java index 252f7d4..b43a87c 100644 --- a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java @@ -62,11 +62,7 @@ public class RawMetricsPublisherTest { new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60); String rawJson = rawMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForRawPublishing()); -<<<<<<< HEAD String expectedResult = "{\"metrics\":[{\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{\"1\":1.0,\"2\":2.0,\"3\":3.0}},{\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{\"1\":4.0,\"2\":5.0,\"3\":6.0}},{\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{\"1\":7.0,\"2\":8.0,\"3\":9.0}}]}"; -======= - String expectedResult = "{\"metrics\":[{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{\"1\":1.0,\"2\":2.0,\"3\":3.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{\"1\":4.0,\"2\":5.0,\"3\":6.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{\"1\":7.0,\"2\":8.0,\"3\":9.0}}]}"; ->>>>>>> trunk Assert.assertNotNull(rawJson); Assert.assertEquals(expectedResult, rawJson); } http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-spark/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-spark/pom.xml b/ambari-metrics/ambari-metrics-spark/pom.xml deleted file mode 100644 index 4732cb5..0000000 --- a/ambari-metrics/ambari-metrics-spark/pom.xml +++ /dev/null @@ -1,151 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <parent> - <artifactId>ambari-metrics</artifactId> - <groupId>org.apache.ambari</groupId> - <version>2.0.0.0-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> - <artifactId>ambari-metrics-spark</artifactId> - <version>2.0.0.0-SNAPSHOT</version> - <properties> - <scala.version>2.10.4</scala.version> - </properties> - - <repositories> - <repository> - <id>scala-tools.org</id> - <name>Scala-Tools Maven2 Repository</name> - <url>http://scala-tools.org/repo-releases</url> - </repository> - </repositories> - - <pluginRepositories> - <pluginRepository> - <id>scala-tools.org</id> - <name>Scala-Tools Maven2 Repository</name> - <url>http://scala-tools.org/repo-releases</url> - </pluginRepository> - </pluginRepositories> - - <dependencies> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.version}</version> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.4</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.specs</groupId> - <artifactId>specs</artifactId> - <version>1.2.5</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.10</artifactId> - <version>1.6.3</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> - <version>1.6.3</version> - </dependency> - <dependency> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-spark</artifactId> - <version>4.7.0-HBase-1.1</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.ambari</groupId> - <artifactId>ambari-metrics-alertservice</artifactId> - <version>2.0.0.0-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-api-scala_2.10</artifactId> - <version>2.8.2</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib_2.10</artifactId> - <version>2.1.1</version> - </dependency> - </dependencies> - - <build> - <sourceDirectory>src/main/scala</sourceDirectory> - <plugins> - <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>compile</goal> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <scalaVersion>${scala.version}</scalaVersion> - <args> - <arg>-target:jvm-1.5</arg> - </args> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <configuration> - <downloadSources>true</downloadSources> - <buildcommands> - <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> - </buildcommands> - <additionalProjectnatures> - <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> - </additionalProjectnatures> - <classpathContainers> - <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> - <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> - </classpathContainers> - </configuration> - </plugin> - </plugins> - </build> - <reporting> - <plugins> - <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <configuration> - <scalaVersion>${scala.version}</scalaVersion> - </configuration> - </plugin> - </plugins> - </reporting> -</project> http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala deleted file mode 100644 index e51a47f..0000000 --- a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.spark - - -import java.util -import java.util.logging.LogManager - -import com.fasterxml.jackson.databind.ObjectMapper -import org.apache.ambari.metrics.alertservice.prototype.MetricsCollectorInterface -import org.apache.spark.SparkConf -import org.apache.spark.streaming._ -import org.apache.spark.streaming.kafka._ -import org.apache.ambari.metrics.alertservice.prototype.methods.{AnomalyDetectionTechnique, MetricAnomaly} -import org.apache.ambari.metrics.alertservice.prototype.methods.ema.{EmaModelLoader, EmaTechnique} -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics -import org.apache.log4j.Logger -import org.apache.spark.storage.StorageLevel - -import scala.collection.JavaConversions._ -import org.apache.logging.log4j.scala.Logging - -object MetricAnomalyDetector extends Logging { - - - var zkQuorum = "avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181" - var groupId = "ambari-metrics-group" - var topicName = "ambari-metrics-topic" - var numThreads = 1 - val anomalyDetectionModels: Array[AnomalyDetectionTechnique] = Array[AnomalyDetectionTechnique]() - - def main(args: Array[String]): Unit = { - - @transient - lazy val log: Logger = org.apache.log4j.LogManager.getLogger("MetricAnomalyDetectorLogger") - - if (args.length < 5) { - System.err.println("Usage: MetricAnomalyDetector <method1,method2> <appid1,appid2> <collector_host> <port> <protocol>") - System.exit(1) - } - - for (method <- args(0).split(",")) { - if (method == "ema") anomalyDetectionModels :+ new EmaTechnique(0.5, 3) - } - - val appIds = util.Arrays.asList(args(1).split(",")) - - val collectorHost = args(2) - val collectorPort = args(3) - val collectorProtocol = args(4) - - val anomalyMetricPublisher: MetricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort) - - val sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector") - - val streamingContext = new StreamingContext(sparkConf, Duration(10000)) - - val emaModel = new EmaModelLoader().load(streamingContext.sparkContext, "/tmp/model/ema") - - val kafkaStream = KafkaUtils.createStream(streamingContext, zkQuorum, groupId, Map(topicName -> numThreads), StorageLevel.MEMORY_AND_DISK_SER_2) - kafkaStream.print() - - var timelineMetricsStream = kafkaStream.map( message => { - val mapper = new ObjectMapper - val metrics = mapper.readValue(message._2, classOf[TimelineMetrics]) - metrics - }) - timelineMetricsStream.print() - - var appMetricStream = timelineMetricsStream.map( timelineMetrics => { - (timelineMetrics.getMetrics.get(0).getAppId, timelineMetrics) - }) - appMetricStream.print() - - var filteredAppMetricStream = appMetricStream.filter( appMetricTuple => { - appIds.contains(appMetricTuple._1) - } ) - filteredAppMetricStream.print() - - filteredAppMetricStream.foreachRDD( rdd => { - rdd.foreach( appMetricTuple => { - val timelineMetrics = appMetricTuple._2 - logger.info("Received Metric (1): " + timelineMetrics.getMetrics.get(0).getMetricName) - log.info("Received Metric (2): " + timelineMetrics.getMetrics.get(0).getMetricName) - for (timelineMetric <- timelineMetrics.getMetrics) { - var anomalies = emaModel.test(timelineMetric) - anomalyMetricPublisher.publish(anomalies) - } - }) - }) - - streamingContext.start() - streamingContext.awaitTermination() - } - } http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala deleted file mode 100644 index edd6366..0000000 --- a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.metrics.spark - -import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric -import org.apache.spark.mllib.stat.Statistics -import org.apache.spark.sql.SQLContext -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.rdd.RDD - -object SparkPhoenixReader { - - def main(args: Array[String]) { - - if (args.length < 6) { - System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>") - System.exit(1) - } - - var metricName = args(0) - var appId = args(1) - var hostname = args(2) - var weight = args(3).toDouble - var timessdev = args(4).toInt - var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure - var modelDir = args(6) - - val conf = new SparkConf() - conf.set("spark.app.name", "AMSAnomalyModelBuilder") - //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077") - - var sc = new SparkContext(conf) - val sqlContext = new SQLContext(sc) - - val currentTime = System.currentTimeMillis() - val oneDayBack = currentTime - 24*60*60*1000 - - val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString)) - df.registerTempTable("METRIC_RECORD") - val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " + - "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack) - - var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double] - result.collect().foreach( - t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5)) - ) - - //val seriesName = result.head().getString(0) - //val hostname = result.head().getString(1) - //val appId = result.head().getString(2) - - val timelineMetric = new TimelineMetric() - timelineMetric.setMetricName(metricName) - timelineMetric.setAppId(appId) - timelineMetric.setHostName(hostname) - timelineMetric.setMetricValues(metricValues) - -// var emaModel = new EmaTechnique() -// emaModel.train(timelineMetric, weight, timessdev) -// emaModel.save(sc, modelDir) - -// var metricData:Seq[Double] = Seq.empty -// result.collect().foreach( -// t => metricData :+ t.getDouble(4) / t.getInt(5) -// ) -// val data: RDD[Double] = sc.parallelize(metricData) -// val myCDF = Map(0.1 -> 0.2, 0.15 -> 0.6, 0.2 -> 0.05, 0.3 -> 0.05, 0.25 -> 0.1) -// val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF) - - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-timelineservice/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml index 4f522f7..fd1c8bd 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml +++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml @@ -342,12 +342,6 @@ </dependency> <dependency> - <groupId>org.apache.ambari</groupId> - <artifactId>ambari-metrics-alertservice</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> <version>2.5</version> http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java deleted file mode 100644 index 2420ef3..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java +++ /dev/null @@ -1,87 +0,0 @@ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics; - -import org.apache.ambari.metrics.alertservice.prototype.TestSeriesInputRequest; -import org.apache.ambari.metrics.alertservice.seriesgenerator.AbstractMetricSeries; -import org.apache.ambari.metrics.alertservice.seriesgenerator.MetricSeriesGeneratorFactory; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; - -public class TestMetricSeriesGenerator implements Runnable { - - private Map<TestSeriesInputRequest, AbstractMetricSeries> configuredSeries = new HashMap<>(); - private static final Log LOG = LogFactory.getLog(TestMetricSeriesGenerator.class); - private TimelineMetricStore metricStore; - private String hostname; - - public TestMetricSeriesGenerator(TimelineMetricStore metricStore) { - this.metricStore = metricStore; - try { - this.hostname = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - e.printStackTrace(); - } - } - - public void addSeries(TestSeriesInputRequest inputRequest) { - if (!configuredSeries.containsKey(inputRequest)) { - AbstractMetricSeries metricSeries = MetricSeriesGeneratorFactory.generateSeries(inputRequest.getSeriesType(), inputRequest.getConfigs()); - configuredSeries.put(inputRequest, metricSeries); - LOG.info("Added series " + inputRequest.getSeriesName()); - } - } - - public void removeSeries(String seriesName) { - boolean isPresent = false; - TestSeriesInputRequest tbd = null; - for (TestSeriesInputRequest inputRequest : configuredSeries.keySet()) { - if (inputRequest.getSeriesName().equals(seriesName)) { - isPresent = true; - tbd = inputRequest; - } - } - if (isPresent) { - LOG.info("Removing series " + seriesName); - configuredSeries.remove(tbd); - } else { - LOG.info("Series not found : " + seriesName); - } - } - - @Override - public void run() { - long currentTime = System.currentTimeMillis(); - TimelineMetrics timelineMetrics = new TimelineMetrics(); - - for (TestSeriesInputRequest input : configuredSeries.keySet()) { - AbstractMetricSeries metricSeries = configuredSeries.get(input); - TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(input.getSeriesName()); - timelineMetric.setAppId("anomaly-engine-test-metric"); - timelineMetric.setInstanceId(null); - timelineMetric.setStartTime(currentTime); - timelineMetric.setHostName(hostname); - TreeMap<Long, Double> metricValues = new TreeMap(); - metricValues.put(currentTime, metricSeries.nextValue()); - timelineMetric.setMetricValues(metricValues); - timelineMetrics.addOrMergeTimelineMetric(timelineMetric); - LOG.info("Emitting metric with appId = " + timelineMetric.getAppId()); - } - try { - LOG.info("Publishing test metrics for " + timelineMetrics.getMetrics().size() + " series."); - metricStore.putMetrics(timelineMetrics); - } catch (Exception e) { - LOG.error(e); - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java deleted file mode 100644 index 6f7b14a..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; - -import com.google.inject.Inject; -import com.google.inject.Singleton; -import org.apache.ambari.metrics.alertservice.prototype.MetricAnomalyDetectorTestInput; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; - -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.Consumes; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - -@Singleton -@Path("/ws/v1/metrictestservice") -public class MetricAnomalyDetectorTestService { - - private static final Log LOG = LogFactory.getLog(MetricAnomalyDetectorTestService.class); - - @Inject - public MetricAnomalyDetectorTestService() { - } - - private void init(HttpServletResponse response) { - response.setContentType(null); - } - - @Path("/anomaly") - @POST - @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) - public TimelinePutResponse postAnomalyDetectionRequest( - @Context HttpServletRequest req, - @Context HttpServletResponse res, - MetricAnomalyDetectorTestInput input) { - - init(res); - if (input == null) { - return new TimelinePutResponse(); - } - - try { - return null; - } catch (Exception e) { - throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); - } - } - - @GET - @Path("/dataseries") - @Produces({MediaType.APPLICATION_JSON}) - public TimelineMetrics getTestDataSeries( - @Context HttpServletRequest req, - @Context HttpServletResponse res, - @QueryParam("type") String seriesType, - @QueryParam("configs") String config - ) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java index 20aba23..5d9bb35 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java @@ -36,7 +36,6 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.metrics2.sink.timeline.Precision; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.TestMetricSeriesGenerator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.EntityIdentifier; http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml index 4e3a803..a68ac63 100644 --- a/ambari-metrics/pom.xml +++ b/ambari-metrics/pom.xml @@ -29,13 +29,12 @@ <module>ambari-metrics-kafka-sink</module> <module>ambari-metrics-storm-sink</module> <module>ambari-metrics-storm-sink-legacy</module> - <module>ambari-metrics-alertservice</module> <module>ambari-metrics-timelineservice</module> <module>ambari-metrics-host-monitoring</module> <module>ambari-metrics-grafana</module> <module>ambari-metrics-assembly</module> <module>ambari-metrics-host-aggregator</module> - <module>ambari-metrics-spark</module> + <module>ambari-metrics-anomaly-detector</module> </modules> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
