This is an automated email from the ASF dual-hosted git repository. avijayan pushed a commit to branch branch-feature-AMBARI-23212 in repository https://gitbox.apache.org/repos/asf/ambari.git
commit af79c69f0a4a710b90fc2582c2c9f39febfad468 Author: Aravindan Vijayan <avija...@hortonworks.com> AuthorDate: Fri Oct 6 10:53:28 2017 -0700 AMBARI-22163 : Anomaly Storage: Design Metric anomalies schema. (avijayan) --- ambari-logsearch/ambari-logsearch-it/pom.xml | 2 +- .../pom.xml | 33 ++++++- .../adservice/common/ADServiceConfiguration.scala | 74 ++++++++++++++ .../adservice/common/PhoenixQueryConstants.scala | 109 +++++++++++++++++++++ .../adservice/db/PhoenixAnomalyStoreAccessor.scala | 67 +++++++++++++ .../spark/prototype/SparkPhoenixReader.scala | 92 ++++++++--------- .../common/ADManagerConfigurationTest.scala | 23 +++++ .../db/PhoenixAnomalyStoreAccessorTest.scala | 26 +++++ ambari-metrics/ambari-metrics-common/pom.xml | 46 +++++++++ .../sink}/timeline/query/ConnectionProvider.java | 5 +- .../timeline/query/DefaultPhoenixDataSource.java | 20 +++- .../timeline/query/PhoenixConnectionProvider.java | 2 +- .../metrics/timeline/PhoenixHBaseAccessor.java | 23 +---- .../TestApplicationHistoryServer.java | 2 +- .../timeline/AbstractMiniHBaseClusterTest.java | 6 +- .../metrics/timeline/PhoenixHBaseAccessorTest.java | 4 +- 16 files changed, 454 insertions(+), 80 deletions(-) diff --git a/ambari-logsearch/ambari-logsearch-it/pom.xml b/ambari-logsearch/ambari-logsearch-it/pom.xml index db3e09f..b3a1d45 100644 --- a/ambari-logsearch/ambari-logsearch-it/pom.xml +++ b/ambari-logsearch/ambari-logsearch-it/pom.xml @@ -122,7 +122,7 @@ </dependencies> <build> - <testOutputDirectory>target/classes</testOutputDirectory> + <testOutputDirectory>test/target/classes</testOutputDirectory> <testResources> <testResource> <directory>src/test/java/</directory> diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml index 1a10f86..6f8f8c1 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml @@ -29,8 +29,9 @@ <artifactId>ambari-metrics-anomaly-detection-service</artifactId> <version>2.0.0.0-SNAPSHOT</version> <properties> - <scala.version>2.10.4</scala.version> + <scala.version>2.11.1</scala.version> <scala.binary.version>2.11</scala.binary.version> + <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version> </properties> <repositories> @@ -201,5 +202,35 @@ <version>2.1.1</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_2.11</artifactId> + <version>3.0.1</version> + <scope>test</scope> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala new file mode 100644 index 0000000..248c74e --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.common + +import java.net.{MalformedURLException, URISyntaxException} + +import org.apache.hadoop.conf.Configuration + +object ADServiceConfiguration { + + private val AMS_AD_SITE_CONFIGURATION_FILE = "ams-ad-site.xml" + private val HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml" + + val ANOMALY_METRICS_TTL = "timeline.metrics.anomaly.data.ttl" + + private var hbaseConf: org.apache.hadoop.conf.Configuration = _ + private var adConf: org.apache.hadoop.conf.Configuration = _ + + def initConfigs(): Unit = { + + var classLoader: ClassLoader = Thread.currentThread.getContextClassLoader + if (classLoader == null) classLoader = getClass.getClassLoader + + try { + val hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE) + if (hbaseResUrl == null) throw new IllegalStateException("Unable to initialize the AD subsystem. No hbase-site present in the classpath.") + + hbaseConf = new Configuration(true) + hbaseConf.addResource(hbaseResUrl.toURI.toURL) + + val adSystemConfigUrl = classLoader.getResource(AMS_AD_SITE_CONFIGURATION_FILE) + if (adSystemConfigUrl == null) throw new IllegalStateException("Unable to initialize the AD subsystem. No ams-ad-site present in the classpath") + + adConf = new Configuration(true) + adConf.addResource(adSystemConfigUrl.toURI.toURL) + + } catch { + case me : MalformedURLException => println("MalformedURLException") + case ue : URISyntaxException => println("URISyntaxException") + } + } + + def getHBaseConf: org.apache.hadoop.conf.Configuration = { + hbaseConf + } + + def getAdConf: org.apache.hadoop.conf.Configuration = { + adConf + } + + def getAnomalyDataTtl: Int = { + if (adConf != null) return adConf.get(ANOMALY_METRICS_TTL, "604800").toInt + 604800 + } + + /** + * ttl + * + */ +} diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala new file mode 100644 index 0000000..5e90d2b --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.common + +object PhoenixQueryConstants { + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /* Table Name constants */ + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + val METRIC_PROFILE_TABLE_NAME = "METRIC_PROFILE" + val METHOD_PARAMETERS_TABLE_NAME = "METHOD_PARAMETERS" + val PIT_ANOMALY_METRICS_TABLE_NAME = "PIT_METRIC_ANOMALIES" + val TREND_ANOMALY_METRICS_TABLE_NAME = "TREND_METRIC_ANOMALIES" + val MODEL_SNAPSHOT = "MODEL_SNAPSHOT" + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /* CREATE statement constants */ + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + val CREATE_METRIC_PROFILE_TABLE = "" + + val CREATE_METHOD_PARAMETERS_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" + + "METHOD_NAME VARCHAR, " + + "METHOD_TYPE VARCHAR, " + + "PARAMETERS VARCHAR " + + "CONSTRAINT pk PRIMARY KEY (METHOD_NAME)) " + + "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, COMPRESSION='SNAPPY'" + + val CREATE_PIT_ANOMALY_METRICS_TABLE_SQL: String = "CREATE TABLE IF NOT EXISTS %s (" + + "METRIC_UUID BINARY(20) NOT NULL, " + + "METHOD_NAME VARCHAR, " + + "ANOMALY_TIMESTAMP UNSIGNED_LONG NOT NULL, " + + "METRIC_VALUE DOUBLE, " + + "SEASONAL_INFO VARCHAR, " + + "ANOMALY_SCORE DOUBLE, " + + "MODEL_SNAPSHOT VARCHAR, " + + "DETECTION_TIME UNSIGNED_LONG " + + "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_TIMESTAMP)) " + + "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'" + + val CREATE_TREND_ANOMALY_METRICS_TABLE_SQL: String = "CREATE TABLE IF NOT EXISTS %s (" + + "METRIC_UUID BINARY(20) NOT NULL, " + + "ANOMALY_PERIOD_START UNSIGNED_LONG NOT NULL, " + + "ANOMALY_PERIOD_END UNSIGNED_LONG NOT NULL, " + + "TEST_PERIOD_START UNSIGNED_LONG NOT NULL, " + + "TEST_PERIOD_END UNSIGNED_LONG NOT NULL, " + + "METHOD_NAME VARCHAR, " + + "ANOMALY_SCORE DOUBLE, " + + "MODEL_SNAPSHOT VARCHAR, " + + "DETECTION_TIME UNSIGNED_LONG " + + "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, TEST_PERIOD_END)) " + + "DATA_BLOCK_ENCODING='FAST_DIFF' IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'" + + val CREATE_MODEL_SNAPSHOT_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" + + "METRIC_UUID BINARY(20), " + + "METHOD_NAME VARCHAR, " + + "METHOD_TYPE VARCHAR, " + + "PARAMETERS VARCHAR " + + "SNAPSHOT_TIME UNSIGNED LONG NOT NULL " + "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME)) " + + "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, COMPRESSION='SNAPPY'" + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /* UPSERT statement constants */ + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + val UPSERT_METHOD_PARAMETERS_SQL: String = "UPSERT INTO %s (METHOD_NAME, METHOD_TYPE, PARAMETERS) VALUES (?,?,?)" + + val UPSERT_PIT_ANOMALY_METRICS_SQL: String = "UPSERT INTO %s (METRIC_UUID, ANOMALY_TIMESTAMP, METRIC_VALUE, METHOD_NAME, " + + "SEASONAL_INFO, ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" + + val UPSERT_TREND_ANOMALY_METRICS_SQL: String = "UPSERT INTO %s (METRIC_UUID, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, " + + "TEST_PERIOD_START, TEST_PERIOD_END, METHOD_NAME, ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" + + val UPSERT_MODEL_SNAPSHOT_SQL: String = "UPSERT INTO %s (METRIC_UUID, METHOD_NAME, METHOD_TYPE, PARAMETERS) VALUES (?, ?, ?, ?)" + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /* GET statement constants */ + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + val GET_METHOD_PAREMETERS_SQL: String = "SELECT METHOD_NAME, METHOD_TYPE, PARAMETERS FROM %s WHERE METHOD_NAME = %s" + + val GET_PIT_ANOMALY_METRIC_SQL: String = "SELECT METRIC_UUID, ANOMALY_TIMESTAMP, METRIC_VALUE, METHOD_NAME, SEASONAL_INFO, " + + "ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME FROM %s WHERE METRIC_METRIC_UUID = ? AND ANOMALY_TIMESTAMP > ? AND ANOMALY_TIMESTAMP <= ? " + + "ORDER BY ANOMALY_SCORE DESC" + + val GET_TREND_ANOMALY_METRIC_SQL: String = "SELECT METRIC_METRIC_UUID, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, " + + "ANOMALY_PERIOD_START, METHOD_NAME, ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME FROM %s WHERE METHOD = ? AND ANOMALY_PERIOD_END > ? " + + "AND TEST_END_TIME <= ? ORDER BY ANOMALY_SCORE DESC" + + val GET_MODEL_SNAPSHOT_SQL: String = "SELECT METRIC_UUID, METHOD_NAME, METHOD_TYPE, PARAMETERS FROM %s WHERE UUID = %s AND METHOD_NAME = %s" + +} diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala new file mode 100644 index 0000000..6f33e56 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.db + +import java.sql.{Connection, SQLException} + +import org.apache.ambari.metrics.adservice.common.{ADServiceConfiguration, PhoenixQueryConstants} +import org.apache.hadoop.hbase.util.RetryCounterFactory +import org.apache.hadoop.metrics2.sink.timeline.query.{DefaultPhoenixDataSource, PhoenixConnectionProvider} +import java.util.concurrent.TimeUnit.SECONDS + +object PhoenixAnomalyStoreAccessor { + + private var datasource: PhoenixConnectionProvider = _ + + def initAnomalyMetricSchema(): Unit = { + + val datasource: PhoenixConnectionProvider = new DefaultPhoenixDataSource(ADServiceConfiguration.getHBaseConf) + val retryCounterFactory = new RetryCounterFactory(10, SECONDS.toMillis(3).toInt) + + val ttl = ADServiceConfiguration.getAnomalyDataTtl + try { + var conn = datasource.getConnectionRetryingOnException(retryCounterFactory) + var stmt = conn.createStatement + + val methodParametersSql = String.format(PhoenixQueryConstants.CREATE_METHOD_PARAMETERS_TABLE, + PhoenixQueryConstants.METHOD_PARAMETERS_TABLE_NAME) + stmt.executeUpdate(methodParametersSql) + + val pointInTimeAnomalySql = String.format(PhoenixQueryConstants.CREATE_PIT_ANOMALY_METRICS_TABLE_SQL, + PhoenixQueryConstants.PIT_ANOMALY_METRICS_TABLE_NAME, + ttl.asInstanceOf[Object]) + stmt.executeUpdate(pointInTimeAnomalySql) + + val trendAnomalySql = String.format(PhoenixQueryConstants.CREATE_TREND_ANOMALY_METRICS_TABLE_SQL, + PhoenixQueryConstants.TREND_ANOMALY_METRICS_TABLE_NAME, + ttl.asInstanceOf[Object]) + stmt.executeUpdate(trendAnomalySql) + + val snapshotSql = String.format(PhoenixQueryConstants.CREATE_MODEL_SNAPSHOT_TABLE, + PhoenixQueryConstants.MODEL_SNAPSHOT) + stmt.executeUpdate(snapshotSql) + + conn.commit() + } catch { + case e: SQLException => throw e + } + } + + @throws[SQLException] + def getConnection: Connection = datasource.getConnection +} diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala index 6e1ae07..ac00764 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala @@ -26,52 +26,52 @@ object SparkPhoenixReader { def main(args: Array[String]) { - if (args.length < 6) { - System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>") - System.exit(1) - } - - var metricName = args(0) - var appId = args(1) - var hostname = args(2) - var weight = args(3).toDouble - var timessdev = args(4).toInt - var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure - var modelDir = args(6) - - val conf = new SparkConf() - conf.set("spark.app.name", "AMSAnomalyModelBuilder") - //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077") - - var sc = new SparkContext(conf) - val sqlContext = new SQLContext(sc) - - val currentTime = System.currentTimeMillis() - val oneDayBack = currentTime - 24*60*60*1000 - - val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString)) - df.registerTempTable("METRIC_RECORD") - val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " + - "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack) - - var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double] - result.collect().foreach( - t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5)) - ) - - //val seriesName = result.head().getString(0) - //val hostname = result.head().getString(1) - //val appId = result.head().getString(2) - - val timelineMetric = new TimelineMetric() - timelineMetric.setMetricName(metricName) - timelineMetric.setAppId(appId) - timelineMetric.setHostName(hostname) - timelineMetric.setMetricValues(metricValues) - - var emaModel = new EmaTechnique(weight, timessdev) - emaModel.test(timelineMetric) - emaModel.save(sc, modelDir) +// if (args.length < 6) { +// System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>") +// System.exit(1) +// } +// +// var metricName = args(0) +// var appId = args(1) +// var hostname = args(2) +// var weight = args(3).toDouble +// var timessdev = args(4).toInt +// var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure +// var modelDir = args(6) +// +// val conf = new SparkConf() +// conf.set("spark.app.name", "AMSAnomalyModelBuilder") +// //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077") +// +// var sc = new SparkContext(conf) +// val sqlContext = new SQLContext(sc) +// +// val currentTime = System.currentTimeMillis() +// val oneDayBack = currentTime - 24*60*60*1000 +// +// val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString)) +// df.registerTempTable("METRIC_RECORD") +// val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " + +// "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack) +// +// var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double] +// result.collect().foreach( +// t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5)) +// ) +// +// //val seriesName = result.head().getString(0) +// //val hostname = result.head().getString(1) +// //val appId = result.head().getString(2) +// +// val timelineMetric = new TimelineMetric() +// timelineMetric.setMetricName(metricName) +// timelineMetric.setAppId(appId) +// timelineMetric.setHostName(hostname) +// timelineMetric.setMetricValues(metricValues) +// +// var emaModel = new EmaTechnique(weight, timessdev) +// emaModel.test(timelineMetric) +// emaModel.save(sc, modelDir) } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala new file mode 100644 index 0000000..535dc9e --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala @@ -0,0 +1,23 @@ +package org.apache.ambari.metrics.adservice.common + +import org.scalatest.FlatSpec + +import scala.collection.mutable + +class ADServiceConfigurationTest extends FlatSpec { + + "A Stack" should "pop values in last-in-first-out order" in { + val stack = new mutable.Stack[Int] + stack.push(1) + stack.push(2) + assert(stack.pop() === 2) + assert(stack.pop() === 1) + } + + it should "throw NoSuchElementException if an empty stack is popped" in { + val emptyStack = new mutable.Stack[String] + assertThrows[NoSuchElementException] { + emptyStack.pop() + } + } +} diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessorTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessorTest.scala new file mode 100644 index 0000000..142e98a --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessorTest.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.db + +import org.scalatest.FunSuite + +class PhoenixAnomalyStoreAccessorTest extends FunSuite { + + test("testInitAnomalyMetricSchema") { + } + +} diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml index 4f08820..5477270 100644 --- a/ambari-metrics/ambari-metrics-common/pom.xml +++ b/ambari-metrics/ambari-metrics-common/pom.xml @@ -26,6 +26,13 @@ <modelVersion>4.0.0</modelVersion> <artifactId>ambari-metrics-common</artifactId> <name>Ambari Metrics Common</name> + + <properties> + <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version> + <hbase.version>1.1.2.2.6.0.3-8</hbase.version> + <phoenix.version>4.7.0.2.6.0.3-8</phoenix.version> + </properties> + <build> <plugins> <plugin> @@ -126,6 +133,45 @@ <dependencies> <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-core</artifactId> + <version>${phoenix.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>net.sf.ehcache</groupId> <artifactId>ehcache</artifactId> <version>2.10.0</version> diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java similarity index 79% rename from ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java rename to ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java index 24239a0..72e5fb5 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java @@ -15,9 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; +package org.apache.hadoop.metrics2.sink.timeline.query; +import org.apache.hadoop.hbase.util.RetryCounterFactory; + import java.sql.Connection; import java.sql.SQLException; @@ -26,4 +28,5 @@ import java.sql.SQLException; */ public interface ConnectionProvider { public Connection getConnection() throws SQLException; + public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) throws SQLException, InterruptedException; } diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java similarity index 81% rename from ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java rename to ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java index c5761f7..a28a433 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; +package org.apache.hadoop.metrics2.sink.timeline.query; import org.apache.commons.logging.Log; @@ -23,6 +23,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; import java.io.IOException; import java.sql.Connection; @@ -87,4 +89,20 @@ public class DefaultPhoenixDataSource implements PhoenixConnectionProvider { } } + public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) + throws SQLException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try{ + return getConnection(); + } catch (SQLException e) { + if(!retryCounter.shouldRetry()){ + LOG.error("HBaseAccessor getConnection failed after " + + retryCounter.getMaxAttempts() + " attempts"); + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } } diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java similarity index 92% rename from ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java rename to ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java index cacbcfb..194c769 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java @@ -1,4 +1,4 @@ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; +package org.apache.hadoop.metrics2.sink.timeline.query; import org.apache.hadoop.hbase.client.HBaseAdmin; diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index f470c58..f8d31f7 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -140,8 +140,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider; +import org.apache.hadoop.metrics2.sink.timeline.query.DefaultPhoenixDataSource; +import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink; @@ -458,23 +458,6 @@ public class PhoenixHBaseAccessor { return mapper.readValue(json, metricValuesTypeRef); } - private Connection getConnectionRetryingOnException() - throws SQLException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try{ - return getConnection(); - } catch (SQLException e) { - if(!retryCounter.shouldRetry()){ - LOG.error("HBaseAccessor getConnection failed after " - + retryCounter.getMaxAttempts() + " attempts"); - throw e; - } - } - retryCounter.sleepUntilNextRetry(); - } - } - /** * Get JDBC connection to HBase store. Assumption is that the hbase * configuration is present on the classpath and loaded by the caller into @@ -507,7 +490,7 @@ public class PhoenixHBaseAccessor { try { LOG.info("Initializing metrics schema..."); - conn = getConnectionRetryingOnException(); + conn = dataSource.getConnectionRetryingOnException(retryCounterFactory); stmt = conn.createStatement(); // Metadata diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java index 03205e7..7b70a80 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource; +import org.apache.hadoop.metrics2.sink.timeline.query.DefaultPhoenixDataSource; import org.apache.zookeeper.ClientCnxn; import org.easymock.EasyMock; import org.junit.After; diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java index 3a42db9..40691d6 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java @@ -22,13 +22,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.assertj.core.api.Assertions.assertThat; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; import static org.powermock.api.easymock.PowerMock.mockStatic; -import static org.powermock.api.easymock.PowerMock.replayAll; import java.io.IOException; -import java.lang.reflect.Field; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; @@ -48,7 +44,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider; +import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java index 7be3c0d..97d2512 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java @@ -32,19 +32,17 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider; +import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import org.apache.phoenix.exception.PhoenixIOException; import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; -import java.lang.reflect.Field; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; -- To stop receiving notification emails like this one, please contact avija...@apache.org.