This is an automated email from the ASF dual-hosted git repository. avijayan pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
commit 8357de8ad1c56abb0daff44c4905301cd69ff1a3 Author: Aravindan Vijayan <[email protected]> AuthorDate: Mon Nov 20 10:46:13 2017 -0800 AMBARI-22470 : Refine Metric Definition Service and AD Query service. (avijayan) --- .../pom.xml | 63 +++-- .../src/main/resources/config.yml | 2 +- .../src/main/resources/hbase-site.xml | 286 +++++++++++++++++++++ .../adservice/app/ADServiceScalaModule.scala | 50 ++++ .../adservice/app/AnomalyDetectionApp.scala | 10 +- .../adservice/app/AnomalyDetectionAppConfig.scala | 4 +- .../adservice/app/AnomalyDetectionAppModule.scala | 5 +- .../configuration/HBaseConfiguration.scala | 3 + .../AdAnomalyStoreAccessor.scala} | 19 +- .../adservice/db/AdMetadataStoreAccessorImpl.scala | 96 +++++++ .../metrics/adservice/db/ConnectionProvider.scala | 45 ++++ .../adservice/db/DefaultPhoenixDataSource.scala | 79 ++++++ .../adservice/db/LevelDbStoreAccessor.scala | 56 ---- .../metrics/adservice/db/MetadataDatasource.scala | 6 + .../adservice/db/PhoenixAnomalyStoreAccessor.scala | 75 ++++-- .../adservice/db/PhoenixConnectionProvider.scala | 66 +++++ .../adservice/db/PhoenixQueryConstants.scala | 12 +- .../adservice/leveldb/LevelDBDatasource.scala | 17 +- .../adservice/metadata/ADMetadataProvider.scala | 86 ++++--- .../metadata/InputMetricDefinitionParser.scala | 24 +- .../adservice/metadata/MetricDefinition.scala | 2 + .../metadata/MetricDefinitionService.scala | 16 +- .../metadata/MetricDefinitionServiceImpl.scala | 73 ++++-- .../metrics/adservice/metadata/MetricKey.scala | 3 + .../metadata/MetricMetadataProvider.scala | 2 +- ...yInstance.scala => MetricAnomalyInstance.scala} | 7 +- .../adservice/resource/AnomalyResource.scala | 55 +++- .../resource/MetricDefinitionResource.scala | 77 +++++- .../metrics/adservice/resource/RootResource.scala | 5 +- .../metrics/adservice/service/ADQueryService.scala | 6 +- .../adservice/service/ADQueryServiceImpl.scala | 25 +- .../adservice/service/AbstractADService.scala | 44 ++++ .../pointintime/PointInTimeAnomalyInstance.scala | 4 +- .../subsystem/trend/TrendAnomalyInstance.scala | 4 +- .../adservice/app/DefaultADResourceSpecTest.scala | 5 +- .../metadata/AMSMetadataProviderTest.scala | 16 +- .../metadata/MetricSourceDefinitionTest.scala | 16 +- ambari-metrics/ambari-metrics-common/pom.xml | 45 ---- .../metrics2/sink/timeline/TimelineMetricKey.java | 59 ----- .../timeline/HBaseTimelineMetricsService.java | 36 ++- .../metrics/timeline/PhoenixHBaseAccessor.java | 26 +- .../metrics/timeline/TimelineMetricStore.java | 3 +- .../timeline/query/ConnectionProvider.java | 3 +- .../timeline/query/DefaultPhoenixDataSource.java | 18 +- .../timeline/query/PhoenixConnectionProvider.java | 2 +- .../webapp/TimelineWebServices.java | 12 +- .../TestApplicationHistoryServer.java | 2 +- .../timeline/AbstractMiniHBaseClusterTest.java | 13 +- .../metrics/timeline/PhoenixHBaseAccessorTest.java | 11 +- .../metrics/timeline/TestTimelineMetricStore.java | 3 +- 50 files changed, 1223 insertions(+), 374 deletions(-) diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml index 142f02f..c6927dd 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml @@ -34,10 +34,12 @@ <properties> <scala.version>2.12.3</scala.version> <scala.binary.version>2.11</scala.binary.version> - <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version> <jackson.version>2.9.1</jackson.version> <dropwizard.version>1.2.0</dropwizard.version> <spark.version>2.2.0</spark.version> + <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version> + <hbase.version>1.1.2.2.6.0.3-8</hbase.version> + <phoenix.version>4.7.0.2.6.0.3-8</phoenix.version> </properties> <repositories> @@ -64,6 +66,7 @@ <directory>src/main/resources</directory> <includes> <include>**/*.yml</include> + <include>**/*.xml</include> <include>**/*.txt</include> </includes> </resource> @@ -145,6 +148,28 @@ <exclude>META-INF/*.RSA</exclude> </excludes> </filter> + <filter> + <artifact>org.apache.phoenix:phoenix-core</artifact> + <excludes> + <exclude>org/joda/time/**</exclude> + <exclude>com/codahale/metrics/**</exclude> + <exclude>com/google/common/collect/**</exclude> + </excludes> + </filter> + <filter> + <artifact>org.apache.phoenix:phoenix-core</artifact> + <excludes> + <exclude>org/joda/time/**</exclude> + <exclude>com/codahale/metrics/**</exclude> + <exclude>com/google/common/collect/**</exclude> + </excludes> + </filter> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>com/sun/jersey/**</exclude> + </excludes> + </filter> </filters> </configuration> <executions> @@ -245,33 +270,25 @@ </dependency> <dependency> <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-spark</artifactId> - <version>4.10.0-HBase-1.1</version> + <artifactId>phoenix-core</artifactId> + <version>${phoenix.version}</version> <exclusions> <exclusion> - <artifactId>jersey-server</artifactId> - <groupId>com.sun.jersey</groupId> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> </exclusion> <exclusion> - <artifactId>jersey-core</artifactId> - <groupId>com.sun.jersey</groupId> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> </exclusion> <exclusion> - <artifactId>jersey-client</artifactId> + <artifactId>jersey-core</artifactId> <groupId>com.sun.jersey</groupId> </exclusion> <exclusion> - <artifactId>jersey-guice</artifactId> - <groupId>com.sun.jersey.contribs</groupId> - </exclusion> - <exclusion> - <artifactId>jersey-json</artifactId> + <artifactId>jersey-server</artifactId> <groupId>com.sun.jersey</groupId> </exclusion> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </exclusion> </exclusions> </dependency> <dependency> @@ -379,6 +396,10 @@ <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> + <exclusion> + <artifactId>jersey-server</artifactId> + <groupId>org.glassfish.jersey.core</groupId> + </exclusion> </exclusions> </dependency> <dependency> @@ -444,6 +465,12 @@ <artifactId>leveldb</artifactId> <version>0.9</version> </dependency> + <!-- https://mvnrepository.com/artifact/org.scalaj/scalaj-http --> + <dependency> + <groupId>org.scalaj</groupId> + <artifactId>scalaj-http_2.12</artifactId> + <version>2.3.0</version> + </dependency> <dependency> <groupId>junit</groupId> @@ -454,7 +481,7 @@ <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> - <version>21.0</version> + <version>18.0</version> </dependency> <dependency> <groupId>io.dropwizard.metrics</groupId> diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml index 9402f6e..7de06b4 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml @@ -27,7 +27,7 @@ metricsCollector: hosts: host1,host2 port: 6188 protocol: http - metadataEndpoint: /v1/timeline/metrics/metadata/keys + metadataEndpoint: /ws/v1/timeline/metrics/metadata/key adQueryService: anomalyDataTtl: 604800 diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml new file mode 100644 index 0000000..66f0454 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml @@ -0,0 +1,286 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--> +<configuration> + + <property> + <name>dfs.client.read.shortcircuit</name> + <value>true</value> + </property> + + <property> + <name>hbase.client.scanner.caching</name> + <value>10000</value> + </property> + + <property> + <name>hbase.client.scanner.timeout.period</name> + <value>300000</value> + </property> + + <property> + <name>hbase.cluster.distributed</name> + <value>false</value> + </property> + + <property> + <name>hbase.hregion.majorcompaction</name> + <value>0</value> + </property> + + <property> + <name>hbase.hregion.max.filesize</name> + <value>4294967296</value> + </property> + + <property> + <name>hbase.hregion.memstore.block.multiplier</name> + <value>4</value> + </property> + + <property> + <name>hbase.hregion.memstore.flush.size</name> + <value>134217728</value> + </property> + + <property> + <name>hbase.hstore.blockingStoreFiles</name> + <value>200</value> + </property> + + <property> + <name>hbase.hstore.flusher.count</name> + <value>2</value> + </property> + + <property> + <name>hbase.local.dir</name> + <value>${hbase.tmp.dir}/local</value> + </property> + + <property> + <name>hbase.master.info.bindAddress</name> + <value>0.0.0.0</value> + </property> + + <property> + <name>hbase.master.info.port</name> + <value>61310</value> + </property> + + <property> + <name>hbase.master.normalizer.class</name> + <value>org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer</value> + </property> + + <property> + <name>hbase.master.port</name> + <value>61300</value> + </property> + + <property> + <name>hbase.master.wait.on.regionservers.mintostart</name> + <value>1</value> + </property> + + <property> + <name>hbase.normalizer.enabled</name> + <value>false</value> + </property> + + <property> + <name>hbase.normalizer.period</name> + <value>600000</value> + </property> + + <property> + <name>hbase.regionserver.global.memstore.lowerLimit</name> + <value>0.3</value> + </property> + + <property> + <name>hbase.regionserver.global.memstore.upperLimit</name> + <value>0.35</value> + </property> + + <property> + <name>hbase.regionserver.info.port</name> + <value>61330</value> + </property> + + <property> + <name>hbase.regionserver.port</name> + <value>61320</value> + </property> + + <property> + <name>hbase.regionserver.thread.compaction.large</name> + <value>2</value> + </property> + + <property> + <name>hbase.regionserver.thread.compaction.small</name> + <value>3</value> + </property> + + <property> + <name>hbase.replication</name> + <value>false</value> + </property> + + <property> + <name>hbase.rootdir</name> + <value>file:///var/lib/ambari-metrics-collector/hbase</value> + </property> + + <property> + <name>hbase.rpc.timeout</name> + <value>300000</value> + </property> + + <property> + <name>hbase.snapshot.enabled</name> + <value>false</value> + </property> + + <property> + <name>hbase.superuser</name> + <value>activity_explorer,activity_analyzer</value> + </property> + + <property> + <name>hbase.tmp.dir</name> + <value>/var/lib/ambari-metrics-collector/hbase-tmp</value> + </property> + + <property> + <name>hbase.zookeeper.leaderport</name> + <value>61388</value> + </property> + + <property> + <name>hbase.zookeeper.peerport</name> + <value>61288</value> + </property> + + <property> + <name>hbase.zookeeper.property.clientPort</name> + <value>61181</value> + </property> + + <property> + <name>hbase.zookeeper.property.dataDir</name> + <value>${hbase.tmp.dir}/zookeeper</value> + </property> + + <property> + <name>hbase.zookeeper.property.tickTime</name> + <value>6000</value> + </property> + + <property> + <name>hbase.zookeeper.quorum</name> + <value>c6401.ambari.apache.org</value> + <final>true</final> + </property> + + <property> + <name>hfile.block.cache.size</name> + <value>0.3</value> + </property> + + <property> + <name>phoenix.coprocessor.maxMetaDataCacheSize</name> + <value>20480000</value> + </property> + + <property> + <name>phoenix.coprocessor.maxServerCacheTimeToLiveMs</name> + <value>60000</value> + </property> + + <property> + <name>phoenix.groupby.maxCacheSize</name> + <value>307200000</value> + </property> + + <property> + <name>phoenix.mutate.batchSize</name> + <value>10000</value> + </property> + + <property> + <name>phoenix.query.keepAliveMs</name> + <value>300000</value> + </property> + + <property> + <name>phoenix.query.maxGlobalMemoryPercentage</name> + <value>15</value> + </property> + + <property> + <name>phoenix.query.rowKeyOrderSaltedTable</name> + <value>true</value> + </property> + + <property> + <name>phoenix.query.spoolThresholdBytes</name> + <value>20971520</value> + </property> + + <property> + <name>phoenix.query.timeoutMs</name> + <value>300000</value> + </property> + + <property> + <name>phoenix.sequence.saltBuckets</name> + <value>2</value> + </property> + + <property> + <name>phoenix.spool.directory</name> + <value>${hbase.tmp.dir}/phoenix-spool</value> + </property> + + <property> + <name>zookeeper.session.timeout</name> + <value>120000</value> + </property> + + <property> + <name>zookeeper.session.timeout.localHBaseCluster</name> + <value>120000</value> + </property> + + <property> + <name>zookeeper.znode.parent</name> + <value>/ams-hbase-unsecure</value> + </property> + + <property> + <name>hbase.use.dynamic.jars</name> + <value>false</value> + </property> + + </configuration> \ No newline at end of file diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/ADServiceScalaModule.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/ADServiceScalaModule.scala new file mode 100644 index 0000000..8578a80 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/ADServiceScalaModule.scala @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.app + +import com.fasterxml.jackson.module.scala._ +import com.fasterxml.jackson.module.scala.deser.{ScalaNumberDeserializersModule, UntypedObjectDeserializerModule} +import com.fasterxml.jackson.module.scala.introspect.{ScalaAnnotationIntrospector, ScalaAnnotationIntrospectorModule} + +/** + * Extended Jackson Module that fixes the Scala-Jackson BytecodeReadingParanamer issue. + */ +class ADServiceScalaModule extends JacksonModule + with IteratorModule + with EnumerationModule + with OptionModule + with SeqModule + with IterableModule + with TupleModule + with MapModule + with SetModule + with FixedScalaAnnotationIntrospectorModule + with UntypedObjectDeserializerModule + with EitherModule { + + override def getModuleName = "ADServiceScalaModule" + + object ADServiceScalaModule extends ADServiceScalaModule + +} + + +trait FixedScalaAnnotationIntrospectorModule extends JacksonModule { + this += { _.appendAnnotationIntrospector(ScalaAnnotationIntrospector) } +} diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala index 8b3a829..2d0dbdf 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala @@ -21,6 +21,9 @@ import javax.ws.rs.Path import javax.ws.rs.container.{ContainerRequestFilter, ContainerResponseFilter} import org.apache.ambari.metrics.adservice.app.GuiceInjector.{withInjector, wrap} +import org.apache.ambari.metrics.adservice.db.{AdAnomalyStoreAccessor, MetadataDatasource} +import org.apache.ambari.metrics.adservice.metadata.MetricDefinitionService +import org.apache.ambari.metrics.adservice.service.ADQueryService import org.glassfish.jersey.filter.LoggingFilter import com.codahale.metrics.health.HealthCheck @@ -45,6 +48,11 @@ class AnomalyDetectionApp extends Application[AnomalyDetectionAppConfig] { injector.instancesOfType(classOf[HealthCheck]).foreach { h => env.healthChecks.register(h.getClass.getName, h) } injector.instancesOfType(classOf[ContainerRequestFilter]).foreach { f => env.jersey().register(f) } injector.instancesOfType(classOf[ContainerResponseFilter]).foreach { f => env.jersey().register(f) } + + //Initialize Services + injector.getInstance(classOf[MetadataDatasource]).initialize + injector.getInstance(classOf[MetricDefinitionService]).initialize + injector.getInstance(classOf[ADQueryService]).initialize } env.jersey.register(jacksonJaxbJsonProvider) env.jersey.register(new LoggingFilter) @@ -53,7 +61,7 @@ class AnomalyDetectionApp extends Application[AnomalyDetectionAppConfig] { private def jacksonJaxbJsonProvider: JacksonJaxbJsonProvider = { val provider = new JacksonJaxbJsonProvider() val objectMapper = new ObjectMapper() - objectMapper.registerModule(DefaultScalaModule) + objectMapper.registerModule(new ADServiceScalaModule) objectMapper.registerModule(new JodaModule) objectMapper.configure(SerializationFeature.WRAP_ROOT_VALUE, false) objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false) diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala index 93f6b28..f9ed4b2 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala @@ -42,7 +42,7 @@ class AnomalyDetectionAppConfig extends Configuration { private val metricCollectorConfiguration = new MetricCollectorConfiguration /* - Anomaly Service configuration + Anomaly Query Service configuration */ @Valid private val adServiceConfiguration = new AdServiceConfiguration @@ -54,7 +54,7 @@ class AnomalyDetectionAppConfig extends Configuration { private val metricDefinitionDBConfiguration = new MetricDefinitionDBConfiguration /* - HBase Conf + AMS HBase Conf */ @JsonIgnore def getHBaseConf : org.apache.hadoop.conf.Configuration = { diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala index a896563..68e9df9 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala @@ -17,7 +17,7 @@ */ package org.apache.ambari.metrics.adservice.app -import org.apache.ambari.metrics.adservice.db.{AdMetadataStoreAccessor, LevelDbStoreAccessor, MetadataDatasource} +import org.apache.ambari.metrics.adservice.db._ import org.apache.ambari.metrics.adservice.leveldb.LevelDBDataSource import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricDefinitionServiceImpl} import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, MetricDefinitionResource, RootResource} @@ -38,9 +38,10 @@ class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environm bind(classOf[AnomalyResource]) bind(classOf[MetricDefinitionResource]) bind(classOf[RootResource]) - bind(classOf[AdMetadataStoreAccessor]).to(classOf[LevelDbStoreAccessor]) + bind(classOf[AdMetadataStoreAccessor]).to(classOf[AdMetadataStoreAccessorImpl]) bind(classOf[ADQueryService]).to(classOf[ADQueryServiceImpl]) bind(classOf[MetricDefinitionService]).to(classOf[MetricDefinitionServiceImpl]) bind(classOf[MetadataDatasource]).to(classOf[LevelDBDataSource]) + bind(classOf[AdAnomalyStoreAccessor]).to(classOf[PhoenixAnomalyStoreAccessor]) } } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala index a51a959..a95ff15 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala @@ -28,6 +28,9 @@ object HBaseConfiguration { var isInitialized: Boolean = false val LOG : Logger = LoggerFactory.getLogger("HBaseConfiguration") + /** + * Initialize the hbase conf from hbase-site present in classpath. + */ def initConfigs(): Unit = { if (!isInitialized) { var classLoader: ClassLoader = Thread.currentThread.getContextClassLoader diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdAnomalyStoreAccessor.scala similarity index 67% copy from ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala copy to ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdAnomalyStoreAccessor.scala index 981a893..676b09a 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdAnomalyStoreAccessor.scala @@ -16,14 +16,21 @@ * limitations under the License. */ -package org.apache.ambari.metrics.adservice.model +package org.apache.ambari.metrics.adservice.db -import org.apache.ambari.metrics.adservice.metadata.MetricKey import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType +import org.apache.ambari.metrics.adservice.model.MetricAnomalyInstance -abstract class SingleMetricAnomalyInstance { +/** + * Trait for anomaly store accessor. (Phoenix) + */ +trait AdAnomalyStoreAccessor { + + def initialize(): Unit - val metricKey: MetricKey - val anomalyType: AnomalyType + def getMetricAnomalies(anomalyType: AnomalyType, + startTime: Long, + endTime: Long, + limit: Int) : List[MetricAnomalyInstance] -} + } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessorImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessorImpl.scala new file mode 100644 index 0000000..7405459 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessorImpl.scala @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.db + +import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinition +import org.apache.commons.lang.SerializationUtils + +import com.google.inject.Inject + +/** + * Implementation of the AdMetadataStoreAccessor. + * Serves as the adaptor between metric definition service and LevelDB worlds. + */ +class AdMetadataStoreAccessorImpl extends AdMetadataStoreAccessor { + + @Inject + var metadataDataSource: MetadataDatasource = _ + + @Inject + def this(metadataDataSource: MetadataDatasource) = { + this + this.metadataDataSource = metadataDataSource + } + + /** + * Return all saved component definitions from DB. + * + * @return + */ + override def getSavedInputDefinitions: List[MetricSourceDefinition] = { + val valuesFromStore : List[MetadataDatasource#Value] = metadataDataSource.getAll + val definitions = scala.collection.mutable.MutableList.empty[MetricSourceDefinition] + + for (value : Array[Byte] <- valuesFromStore) { + val definition : MetricSourceDefinition = SerializationUtils.deserialize(value).asInstanceOf[MetricSourceDefinition] + if (definition != null) { + definitions.+=(definition) + } + } + definitions.toList + } + + /** + * Save a set of component definitions + * + * @param metricSourceDefinitions Set of component definitions + * @return Success / Failure + */ + override def saveInputDefinitions(metricSourceDefinitions: List[MetricSourceDefinition]): Boolean = { + for (definition <- metricSourceDefinitions) { + saveInputDefinition(definition) + } + true + } + + /** + * Save a component definition + * + * @param metricSourceDefinition component definition + * @return Success / Failure + */ + override def saveInputDefinition(metricSourceDefinition: MetricSourceDefinition): Boolean = { + val storeValue : MetadataDatasource#Value = SerializationUtils.serialize(metricSourceDefinition) + val storeKey : MetadataDatasource#Key = metricSourceDefinition.definitionName.getBytes() + metadataDataSource.put(storeKey, storeValue) + true + } + + /** + * Delete a component definition + * + * @param definitionName component definition + * @return + */ + override def removeInputDefinition(definitionName: String): Boolean = { + val storeKey : MetadataDatasource#Key = definitionName.getBytes() + metadataDataSource.delete(storeKey) + true + } +} diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/ConnectionProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/ConnectionProvider.scala new file mode 100644 index 0000000..cc02ed4 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/ConnectionProvider.scala @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *//** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.db + +import java.sql.Connection +import java.sql.SQLException + +/** + * Provides a connection to the anomaly store. + */ +trait ConnectionProvider { + @throws[SQLException] + def getConnection: Connection +} \ No newline at end of file diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/DefaultPhoenixDataSource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/DefaultPhoenixDataSource.scala new file mode 100644 index 0000000..d9396de --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/DefaultPhoenixDataSource.scala @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.db + +import org.apache.commons.logging.LogFactory +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.client.ConnectionFactory +import org.apache.hadoop.hbase.client.HBaseAdmin +import java.io.IOException +import java.sql.Connection +import java.sql.DriverManager +import java.sql.SQLException + +object DefaultPhoenixDataSource { + private[db] val LOG = LogFactory.getLog(classOf[DefaultPhoenixDataSource]) + private val ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort" + private val ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum" + private val ZNODE_PARENT = "zookeeper.znode.parent" + private val connectionUrl = "jdbc:phoenix:%s:%s:%s" +} + +class DefaultPhoenixDataSource(var hbaseConf: Configuration) extends PhoenixConnectionProvider { + + val zookeeperClientPort: String = hbaseConf.getTrimmed(DefaultPhoenixDataSource.ZOOKEEPER_CLIENT_PORT, "2181") + val zookeeperQuorum: String = hbaseConf.getTrimmed(DefaultPhoenixDataSource.ZOOKEEPER_QUORUM) + val znodeParent: String = hbaseConf.getTrimmed(DefaultPhoenixDataSource.ZNODE_PARENT, "/ams-hbase-unsecure") + final private var url : String = _ + + if (zookeeperQuorum == null || zookeeperQuorum.isEmpty) { + throw new IllegalStateException("Unable to find Zookeeper quorum to access HBase store using Phoenix.") + } + url = String.format(DefaultPhoenixDataSource.connectionUrl, zookeeperQuorum, zookeeperClientPort, znodeParent) + + + /** + * Get HBaseAdmin for table ops. + * + * @return @HBaseAdmin + * @throws IOException + */ + @throws[IOException] + override def getHBaseAdmin: HBaseAdmin = ConnectionFactory.createConnection(hbaseConf).getAdmin.asInstanceOf[HBaseAdmin] + + /** + * Get JDBC connection to HBase store. Assumption is that the hbase + * configuration is present on the classpath and loaded by the caller into + * the Configuration object. + * Phoenix already caches the HConnection between the client and HBase + * cluster. + * + * @return @java.sql.Connection + */ + @throws[SQLException] + override def getConnection: Connection = { + DefaultPhoenixDataSource.LOG.debug("Metric store connection url: " + url) + try DriverManager.getConnection(url) + catch { + case e: SQLException => + DefaultPhoenixDataSource.LOG.warn("Unable to connect to HBase store using Phoenix.", e) + throw e + } + } + +} \ No newline at end of file diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala deleted file mode 100644 index baad57d..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala +++ /dev/null @@ -1,56 +0,0 @@ -package org.apache.ambari.metrics.adservice.db - -import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinition - -import com.google.inject.Inject - -class LevelDbStoreAccessor extends AdMetadataStoreAccessor{ - - @Inject - var levelDbDataSource : MetadataDatasource = _ - - @Inject - def this(levelDbDataSource: MetadataDatasource) = { - this - this.levelDbDataSource = levelDbDataSource - } - - /** - * Return all saved component definitions from DB. - * - * @return - */ - override def getSavedInputDefinitions: List[MetricSourceDefinition] = { - List.empty[MetricSourceDefinition] - } - - /** - * Save a set of component definitions - * - * @param metricSourceDefinitions Set of component definitions - * @return Success / Failure - */ -override def saveInputDefinitions(metricSourceDefinitions: List[MetricSourceDefinition]): Boolean = { - true -} - - /** - * Save a component definition - * - * @param metricSourceDefinition component definition - * @return Success / Failure - */ - override def saveInputDefinition(metricSourceDefinition: MetricSourceDefinition): Boolean = { - true - } - - /** - * Delete a component definition - * - * @param definitionName component definition - * @return - */ - override def removeInputDefinition(definitionName: String): Boolean = { - true - } -} diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala index aa6694a..7b223a2 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala @@ -44,6 +44,12 @@ trait MetadataDatasource { */ def get(key: Key): Option[Value] + /** + * This function obtains all the values + * + * @return the list of values + */ + def getAll: List[Value] /** * This function associates a key to a value, overwriting if necessary diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala index 36aea21..147d1f7 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala @@ -23,48 +23,60 @@ import java.util.concurrent.TimeUnit.SECONDS import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig import org.apache.ambari.metrics.adservice.common._ import org.apache.ambari.metrics.adservice.configuration.HBaseConfiguration -import org.apache.ambari.metrics.adservice.metadata.MetricKey +import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricKey} import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType -import org.apache.ambari.metrics.adservice.model.{AnomalyDetectionMethod, AnomalyType, SingleMetricAnomalyInstance} +import org.apache.ambari.metrics.adservice.model.{AnomalyDetectionMethod, AnomalyType, MetricAnomalyInstance} import org.apache.ambari.metrics.adservice.subsystem.pointintime.PointInTimeAnomalyInstance import org.apache.ambari.metrics.adservice.subsystem.trend.TrendAnomalyInstance import org.apache.hadoop.hbase.util.RetryCounterFactory -import org.apache.hadoop.metrics2.sink.timeline.query.{DefaultPhoenixDataSource, PhoenixConnectionProvider} +import org.slf4j.{Logger, LoggerFactory} import com.google.inject.Inject -object PhoenixAnomalyStoreAccessor { +/** + * Phoenix query handler class. + */ +class PhoenixAnomalyStoreAccessor extends AdAnomalyStoreAccessor { @Inject var configuration: AnomalyDetectionAppConfig = _ + @Inject + var metricDefinitionService: MetricDefinitionService = _ + var datasource: PhoenixConnectionProvider = _ + val LOG : Logger = LoggerFactory.getLogger(classOf[PhoenixAnomalyStoreAccessor]) - def initAnomalyMetricSchema(): Unit = { + @Override + def initialize(): Unit = { - val datasource: PhoenixConnectionProvider = new DefaultPhoenixDataSource(HBaseConfiguration.getHBaseConf) + datasource = new DefaultPhoenixDataSource(HBaseConfiguration.getHBaseConf) val retryCounterFactory = new RetryCounterFactory(10, SECONDS.toMillis(3).toInt) val ttl = configuration.getAdServiceConfiguration.getAnomalyDataTtl try { - var conn = datasource.getConnectionRetryingOnException(retryCounterFactory) + var conn : Connection = getConnectionRetryingOnException(retryCounterFactory) var stmt = conn.createStatement + //Create Method parameters table. val methodParametersSql = String.format(PhoenixQueryConstants.CREATE_METHOD_PARAMETERS_TABLE, PhoenixQueryConstants.METHOD_PARAMETERS_TABLE_NAME) stmt.executeUpdate(methodParametersSql) + //Create Point in Time anomaly table val pointInTimeAnomalySql = String.format(PhoenixQueryConstants.CREATE_PIT_ANOMALY_METRICS_TABLE_SQL, PhoenixQueryConstants.PIT_ANOMALY_METRICS_TABLE_NAME, ttl.asInstanceOf[Object]) stmt.executeUpdate(pointInTimeAnomalySql) + //Create Trend Anomaly table val trendAnomalySql = String.format(PhoenixQueryConstants.CREATE_TREND_ANOMALY_METRICS_TABLE_SQL, PhoenixQueryConstants.TREND_ANOMALY_METRICS_TABLE_NAME, ttl.asInstanceOf[Object]) stmt.executeUpdate(trendAnomalySql) + //Create model snapshot table. val snapshotSql = String.format(PhoenixQueryConstants.CREATE_MODEL_SNAPSHOT_TABLE, PhoenixQueryConstants.MODEL_SNAPSHOT) stmt.executeUpdate(snapshotSql) @@ -75,11 +87,9 @@ object PhoenixAnomalyStoreAccessor { } } - @throws[SQLException] - def getConnection: Connection = datasource.getConnection - - def getSingleMetricAnomalies(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int) : scala.collection.mutable.MutableList[SingleMetricAnomalyInstance] = { - val anomalies = scala.collection.mutable.MutableList.empty[SingleMetricAnomalyInstance] + @Override + def getMetricAnomalies(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int) : List[MetricAnomalyInstance] = { + val anomalies = scala.collection.mutable.MutableList.empty[MetricAnomalyInstance] val conn : Connection = getConnection var stmt : PreparedStatement = null var rs : ResultSet = null @@ -98,8 +108,8 @@ object PhoenixAnomalyStoreAccessor { val anomalyScore: Double = rs.getDouble("ANOMALY_SCORE") val modelSnapshot: String = rs.getString("MODEL_PARAMETERS") - val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid) //TODO - val anomalyInstance: SingleMetricAnomalyInstance = new PointInTimeAnomalyInstance(metricKey, timestamp, + val metricKey: MetricKey = metricDefinitionService.getMetricKeyFromUuid(uuid) + val anomalyInstance: MetricAnomalyInstance = new PointInTimeAnomalyInstance(metricKey, timestamp, metricValue, methodType, anomalyScore, season, modelSnapshot) anomalies.+=(anomalyInstance) } @@ -115,8 +125,8 @@ object PhoenixAnomalyStoreAccessor { val anomalyScore: Double = rs.getDouble("ANOMALY_SCORE") val modelSnapshot: String = rs.getString("MODEL_PARAMETERS") - val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid) //TODO - val anomalyInstance: SingleMetricAnomalyInstance = TrendAnomalyInstance(metricKey, + val metricKey: MetricKey = metricDefinitionService.getMetricKeyFromUuid(uuid) + val anomalyInstance: MetricAnomalyInstance = TrendAnomalyInstance(metricKey, TimeRange(anomalyStart, anomalyEnd), TimeRange(referenceStart, referenceEnd), methodType, anomalyScore, season, modelSnapshot) @@ -127,11 +137,11 @@ object PhoenixAnomalyStoreAccessor { case e: SQLException => throw e } - anomalies + anomalies.toList } @throws[SQLException] - def prepareAnomalyMetricsGetSqlStatement(connection: Connection, anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): PreparedStatement = { + private def prepareAnomalyMetricsGetSqlStatement(connection: Connection, anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): PreparedStatement = { val sb = new StringBuilder @@ -145,11 +155,11 @@ object PhoenixAnomalyStoreAccessor { var stmt: java.sql.PreparedStatement = null try { stmt = connection.prepareStatement(sb.toString) - var pos = 1 - pos += 1 + var pos = 1 stmt.setLong(pos, startTime) + pos += 1 stmt.setLong(pos, endTime) stmt.setFetchSize(limit) @@ -157,9 +167,32 @@ object PhoenixAnomalyStoreAccessor { } catch { case e: SQLException => if (stmt != null) - stmt + return stmt throw e } stmt } + + @throws[SQLException] + private def getConnection: Connection = datasource.getConnection + + @throws[SQLException] + @throws[InterruptedException] + private def getConnectionRetryingOnException (retryCounterFactory : RetryCounterFactory) : Connection = { + val retryCounter = retryCounterFactory.create + while(true) { + try + return getConnection + catch { + case e: SQLException => + if (!retryCounter.shouldRetry) { + LOG.error("HBaseAccessor getConnection failed after " + retryCounter.getMaxAttempts + " attempts") + throw e + } + } + retryCounter.sleepUntilNextRetry() + } + null + } + } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixConnectionProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixConnectionProvider.scala new file mode 100644 index 0000000..1faf1ba --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixConnectionProvider.scala @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *//** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.db + +import org.apache.hadoop.hbase.client.HBaseAdmin +import java.io.IOException + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +trait PhoenixConnectionProvider extends ConnectionProvider { + /** + * Get HBaseAdmin for the Phoenix connection + * + * @return + * @throws IOException + */ + @throws[IOException] + def getHBaseAdmin: HBaseAdmin +} \ No newline at end of file diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala index 5379c91..d9774e0 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala @@ -54,25 +54,25 @@ object PhoenixQueryConstants { val CREATE_TREND_ANOMALY_METRICS_TABLE_SQL: String = "CREATE TABLE IF NOT EXISTS %s (" + "METRIC_UUID BINARY(20) NOT NULL, " + + "METHOD_NAME VARCHAR, " + "ANOMALY_PERIOD_START UNSIGNED_LONG NOT NULL, " + "ANOMALY_PERIOD_END UNSIGNED_LONG NOT NULL, " + "TEST_PERIOD_START UNSIGNED_LONG NOT NULL, " + "TEST_PERIOD_END UNSIGNED_LONG NOT NULL, " + - "METHOD_NAME VARCHAR, " + "SEASONAL_INFO VARCHAR, " + "ANOMALY_SCORE DOUBLE, " + "MODEL_PARAMETERS VARCHAR, " + "DETECTION_TIME UNSIGNED_LONG " + "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, TEST_PERIOD_END)) " + - "DATA_BLOCK_ENCODING='FAST_DIFF' IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'" + "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'" val CREATE_MODEL_SNAPSHOT_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" + - "METRIC_UUID BINARY(20), " + + "METRIC_UUID BINARY(20) NOT NULL, " + "METHOD_NAME VARCHAR, " + "METHOD_TYPE VARCHAR, " + - "PARAMETERS VARCHAR " + - "SNAPSHOT_TIME UNSIGNED LONG NOT NULL " + - "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME)) " + + "PARAMETERS VARCHAR, " + + "SNAPSHOT_TIME UNSIGNED_LONG NOT NULL " + + "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, SNAPSHOT_TIME)) " + "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, COMPRESSION='SNAPPY'" ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala index a34a60a..49ef272 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala @@ -42,7 +42,6 @@ class LevelDBDataSource() extends MetadataDatasource { def this(appConfig: AnomalyDetectionAppConfig) = { this this.appConfig = appConfig - initialize() } override def initialize(): Unit = { @@ -83,6 +82,22 @@ class LevelDBDataSource() extends MetadataDatasource { override def get(key: Key): Option[Value] = Option(db.get(key)) /** + * This function obtains all the values + * + * @return the list of values + */ + def getAll: List[Value] = { + val values = scala.collection.mutable.MutableList.empty[Value] + val iterator = db.iterator() + iterator.seekToFirst() + while (iterator.hasNext) { + val entry: java.util.Map.Entry[Key, Value] = iterator.next() + values.+=(entry.getValue) + } + values.toList + } + + /** * This function updates the DataSource by deleting, updating and inserting new (key-value) pairs. * * @param toRemove which includes all the keys to be removed from the DataSource. diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala index 95b1b63..c277221 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala @@ -17,15 +17,17 @@ package org.apache.ambari.metrics.adservice.metadata -import java.net.{HttpURLConnection, URL} +import javax.ws.rs.core.Response import org.apache.ambari.metrics.adservice.configuration.MetricCollectorConfiguration import org.apache.commons.lang.StringUtils -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey +import org.slf4j.{Logger, LoggerFactory} import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper +import scalaj.http.{Http, HttpRequest, HttpResponse} + /** * Class to invoke Metrics Collector metadata API. * TODO : Instantiate a sync thread that regularly updates the internal maps by reading off AMS metadata. @@ -36,6 +38,7 @@ class ADMetadataProvider extends MetricMetadataProvider { var metricCollectorPort: String = _ var metricCollectorProtocol: String = _ var metricMetadataPath: String = "/v1/timeline/metrics/metadata/keys" + val LOG : Logger = LoggerFactory.getLogger(classOf[ADMetadataProvider]) val connectTimeout: Int = 10000 val readTimeout: Int = 10000 @@ -52,10 +55,8 @@ class ADMetadataProvider extends MetricMetadataProvider { metricMetadataPath = configuration.getMetadataEndpoint } - override def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): (Map[MetricDefinition, - Set[MetricKey]], Set[MetricKey]) = { + override def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): Set[MetricKey] = { - val keysMap = scala.collection.mutable.Map[MetricDefinition, Set[MetricKey]]() val numDefinitions: Int = metricSourceDefinition.metricDefinitions.size val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey] @@ -64,52 +65,79 @@ class ADMetadataProvider extends MetricMetadataProvider { for (host <- metricCollectorHosts) { val metricKeys: Set[MetricKey] = getKeysFromMetricsCollector(metricCollectorProtocol, host, metricCollectorPort, metricMetadataPath, metricDef) if (metricKeys != null) { - keysMap += (metricDef -> metricKeys) - metricKeySet.++(metricKeys) + metricKeySet.++=(metricKeys) } } } } - (keysMap.toMap, metricKeySet.toSet) + metricKeySet.toSet } /** - * Make Metrics Collector REST API call to fetch keys. * - * @param url + * @param protocol + * @param host + * @param port + * @param path * @param metricDefinition * @return */ def getKeysFromMetricsCollector(protocol: String, host: String, port: String, path: String, metricDefinition: MetricDefinition): Set[MetricKey] = { - val url: String = protocol + "://" + host + port + "/" + path + val url: String = protocol + "://" + host + ":" + port + path val mapper = new ObjectMapper() with ScalaObjectMapper + + if (metricDefinition.hosts == null || metricDefinition.hosts.isEmpty) { + val request: HttpRequest = Http(url) + .param("metricName", metricDefinition.metricName) + .param("appId", metricDefinition.appId) + makeHttpGetCall(request, mapper) + } else { + val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey] + + for (h <- metricDefinition.hosts) { + val request: HttpRequest = Http(url) + .param("metricName", metricDefinition.metricName) + .param("appId", metricDefinition.appId) + .param("hostname", h) + + val metricKeys = makeHttpGetCall(request, mapper) + metricKeySet.++=(metricKeys) + } + metricKeySet.toSet + } + } + + private def makeHttpGetCall(request: HttpRequest, mapper: ObjectMapper): Set[MetricKey] = { + try { - val connection = new URL(url).openConnection.asInstanceOf[HttpURLConnection] - connection.setConnectTimeout(connectTimeout) - connection.setReadTimeout(readTimeout) - connection.setRequestMethod("GET") - val inputStream = connection.getInputStream - val content = scala.io.Source.fromInputStream(inputStream).mkString - if (inputStream != null) inputStream.close() - val metricKeySet: Set[MetricKey] = fromTimelineMetricKey(mapper.readValue[java.util.Set[TimelineMetricKey]](content)) - return metricKeySet + val result: HttpResponse[String] = request.asString + if (result.code == Response.Status.OK.getStatusCode) { + LOG.info("Successfully fetched metric keys from metrics collector") + val metricKeySet: java.util.Set[java.util.Map[String, String]] = mapper.readValue(result.body, + classOf[java.util.Set[java.util.Map[String, String]]]) + getMetricKeys(metricKeySet) + } else { + LOG.error("Got an error when trying to fetch metric key from metrics collector. Code = " + result.code + ", Message = " + result.body) + } } catch { - case _: java.io.IOException | _: java.net.SocketTimeoutException => // handle this + case _: java.io.IOException | _: java.net.SocketTimeoutException => LOG.error("Unable to fetch metric keys from Metrics collector for : " + request.toString) } - null + Set.empty[MetricKey] } - def fromTimelineMetricKey(timelineMetricKeys: java.util.Set[TimelineMetricKey]): Set[MetricKey] = { + + def getMetricKeys(timelineMetricKeys: java.util.Set[java.util.Map[String, String]]): Set[MetricKey] = { val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey] val iter = timelineMetricKeys.iterator() while (iter.hasNext) { - val timelineMetricKey: TimelineMetricKey = iter.next() - val metricKey: MetricKey = MetricKey(timelineMetricKey.metricName, - timelineMetricKey.appId, - timelineMetricKey.instanceId, - timelineMetricKey.hostName, - timelineMetricKey.uuid) + val timelineMetricKey: java.util.Map[String, String] = iter.next() + val metricKey: MetricKey = MetricKey( + timelineMetricKey.get("metricName"), + timelineMetricKey.get("appId"), + timelineMetricKey.get("instanceId"), + timelineMetricKey.get("hostname"), + timelineMetricKey.get("uuid").getBytes()) metricKeySet.add(metricKey) } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala index cc66c90..3c8ea84 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala @@ -19,6 +19,8 @@ package org.apache.ambari.metrics.adservice.metadata import java.io.File +import org.apache.ambari.metrics.adservice.app.ADServiceScalaModule + import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper @@ -30,15 +32,19 @@ object InputMetricDefinitionParser { return List.empty[MetricSourceDefinition] } val mapper = new ObjectMapper() with ScalaObjectMapper - - def metricSourceDefinitions: List[MetricSourceDefinition] = - for { - file <- getFilesInDirectory(directory) - definition: MetricSourceDefinition = mapper.readValue[MetricSourceDefinition](file) - if definition != null - } yield definition - - metricSourceDefinitions + mapper.registerModule(new ADServiceScalaModule) + val metricSourceDefinitions: scala.collection.mutable.MutableList[MetricSourceDefinition] = + scala.collection.mutable.MutableList.empty[MetricSourceDefinition] + + for (file <- getFilesInDirectory(directory)) { + val source = scala.io.Source.fromFile(file) + val lines = try source.mkString finally source.close() + val definition: MetricSourceDefinition = mapper.readValue[MetricSourceDefinition](lines) + if (definition != null) { + metricSourceDefinitions.+=(definition) + } + } + metricSourceDefinitions.toList } private def getFilesInDirectory(directory: String): List[File] = { diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala index 036867b..c668dfa 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala @@ -19,6 +19,8 @@ package org.apache.ambari.metrics.adservice.metadata import org.apache.commons.lang3.StringUtils + +import com.fasterxml.jackson.annotation.JsonIgnore /* { "metric-name": "mem_free", diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala index 635dc60..52ce39e 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala @@ -17,7 +17,9 @@ package org.apache.ambari.metrics.adservice.metadata -trait MetricDefinitionService { +import org.apache.ambari.metrics.adservice.service.AbstractADService + +trait MetricDefinitionService extends AbstractADService{ /** * Given a 'UUID', return the metric key associated with it. @@ -27,6 +29,12 @@ trait MetricDefinitionService { def getMetricKeyFromUuid(uuid: Array[Byte]) : MetricKey /** + * Return all the definitions being tracked. + * @return Map of Metric Source Definition name to Metric Source Definition. + */ + def getDefinitions: List[MetricSourceDefinition] + + /** * Given a component definition name, return the definition associated with it. * @param name component definition name * @return @@ -61,4 +69,10 @@ trait MetricDefinitionService { */ def getDefinitionByAppId(appId: String) : List[MetricSourceDefinition] + /** + * Return the mapping between definition name to set of metric keys. + * @return Map of Metric Source Definition to set of metric keys associated with it. + */ + def getMetricKeys: Map[String, Set[MetricKey]] + } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala index c34d2dd..b9b4a7c 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala @@ -32,31 +32,24 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService { var configuration: AnomalyDetectionAppConfig = _ var metricMetadataProvider: MetricMetadataProvider = _ - var metricSourceDefinitionMap: Map[String, MetricSourceDefinition] = Map() - var metricKeys: Set[MetricKey] = Set.empty[MetricKey] - var metricDefinitionMetricKeyMap: Map[MetricDefinition, Set[MetricKey]] = Map() + val metricSourceDefinitionMap: scala.collection.mutable.Map[String, MetricSourceDefinition] = scala.collection.mutable.Map() + val metricDefinitionMetricKeyMap: scala.collection.mutable.Map[MetricSourceDefinition, Set[MetricKey]] = scala.collection.mutable.Map() + val metricKeys: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey] @Inject def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig, metadataStoreAccessor: AdMetadataStoreAccessor) = { this () adMetadataStoreAccessor = metadataStoreAccessor configuration = anomalyDetectionAppConfig - initializeService() } - def initializeService() : Unit = { - - //Create AD Metadata Schema - //TODO Make sure AD Metadata DB is initialized here. + @Override + def initialize() : Unit = { + LOG.info("Initializing Metric Definition Service...") //Initialize Metric Metadata Provider metricMetadataProvider = new ADMetadataProvider(configuration.getMetricCollectorConfiguration) - loadMetricSourceDefinitions() - } - - def loadMetricSourceDefinitions() : Unit = { - //Load definitions from metadata store val definitionsFromStore: List[MetricSourceDefinition] = adMetadataStoreAccessor.getSavedInputDefinitions for (definition <- definitionsFromStore) { @@ -71,14 +64,16 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService { //Union the 2 sources, with DB taking precedence. //Save new definition list to DB. - metricSourceDefinitionMap = metricSourceDefinitionMap.++(combineDefinitionSources(definitionsFromConfig, definitionsFromStore)) + metricSourceDefinitionMap.++=(combineDefinitionSources(definitionsFromConfig, definitionsFromStore)) - //Reach out to AMS Metadata and get Metric Keys. Pass in List<CD> and get back (Map<MD,Set<MK>>, Set<MK>) + //Reach out to AMS Metadata and get Metric Keys. Pass in MSD and get back Set<MK> for (definition <- metricSourceDefinitionMap.values) { - val (definitionKeyMap: Map[MetricDefinition, Set[MetricKey]], keys: Set[MetricKey])= metricMetadataProvider.getMetricKeysForDefinitions(definition) - metricDefinitionMetricKeyMap = metricDefinitionMetricKeyMap.++(definitionKeyMap) - metricKeys = metricKeys.++(keys) + val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition) + metricDefinitionMetricKeyMap(definition) = keys + metricKeys.++=(keys) } + + LOG.info("Successfully initialized Metric Definition Service.") } def getMetricKeyFromUuid(uuid: Array[Byte]): MetricKey = { @@ -92,16 +87,24 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService { } @Override + def getDefinitions: List[MetricSourceDefinition] = { + metricSourceDefinitionMap.values.toList + } + + @Override def getDefinitionByName(name: String): MetricSourceDefinition = { if (!metricSourceDefinitionMap.contains(name)) { LOG.warn("Metric Source Definition with name " + name + " not found") + null + } else { + metricSourceDefinitionMap.apply(name) } - metricSourceDefinitionMap.apply(name) } @Override def addDefinition(definition: MetricSourceDefinition): Boolean = { if (metricSourceDefinitionMap.contains(definition.definitionName)) { + LOG.info("Definition with name " + definition.definitionName + " already present.") return false } definition.definitionSource = MetricSourceDefinitionType.API @@ -109,6 +112,10 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService { val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition) if (success) { metricSourceDefinitionMap += definition.definitionName -> definition + val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition) + metricDefinitionMetricKeyMap(definition) = keys + metricKeys.++=(keys) + LOG.info("Successfully created metric source definition : " + definition.definitionName) } success } @@ -116,16 +123,22 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService { @Override def updateDefinition(definition: MetricSourceDefinition): Boolean = { if (!metricSourceDefinitionMap.contains(definition.definitionName)) { + LOG.warn("Metric Source Definition with name " + definition.definitionName + " not found") return false } if (metricSourceDefinitionMap.apply(definition.definitionName).definitionSource != MetricSourceDefinitionType.API) { return false } + definition.definitionSource = MetricSourceDefinitionType.API val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition) if (success) { metricSourceDefinitionMap += definition.definitionName -> definition + val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition) + metricDefinitionMetricKeyMap(definition) = keys + metricKeys.++=(keys) + LOG.info("Successfully updated metric source definition : " + definition.definitionName) } success } @@ -133,17 +146,22 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService { @Override def deleteDefinitionByName(name: String): Boolean = { if (!metricSourceDefinitionMap.contains(name)) { + LOG.warn("Metric Source Definition with name " + name + " not found") return false } val definition : MetricSourceDefinition = metricSourceDefinitionMap.apply(name) if (definition.definitionSource != MetricSourceDefinitionType.API) { + LOG.warn("Cannot delete metric source definition which was not created through API.") return false } val success: Boolean = adMetadataStoreAccessor.removeInputDefinition(name) if (success) { - metricSourceDefinitionMap += definition.definitionName -> definition + metricSourceDefinitionMap -= definition.definitionName + metricKeys.--=(metricDefinitionMetricKeyMap.apply(definition)) + metricDefinitionMetricKeyMap -= definition + LOG.info("Successfully deleted metric source definition : " + name) } success } @@ -183,7 +201,6 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService { this.adMetadataStoreAccessor = adMetadataStoreAccessor } - /** * Look into the Metric Definitions inside a Metric Source definition, and push down source level appId & * hosts to Metric definition if they do not have an override. @@ -202,7 +219,7 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService { } } - if (metricDef.isValid && metricDef.hosts.isEmpty) { + if (metricDef.isValid && (metricDef.hosts == null || metricDef.hosts.isEmpty)) { if (sourceLevelHostList != null && sourceLevelHostList.nonEmpty) { metricDef.hosts = sourceLevelHostList } @@ -210,4 +227,16 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService { } } + /** + * Return the mapping between definition name to set of metric keys. + * + * @return Map of Metric Source Definition to set of metric keys associated with it. + */ + override def getMetricKeys: Map[String, Set[MetricKey]] = { + val metricKeyMap: scala.collection.mutable.Map[String, Set[MetricKey]] = scala.collection.mutable.Map() + for (definition <- metricSourceDefinitionMap.values) { + metricKeyMap(definition.definitionName) = metricDefinitionMetricKeyMap.apply(definition) + } + metricKeyMap.toMap + } } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala index afad617..65c496e 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala @@ -18,6 +18,9 @@ package org.apache.ambari.metrics.adservice.metadata +import javax.xml.bind.annotation.XmlRootElement + +@XmlRootElement case class MetricKey (metricName: String, appId: String, instanceId: String, hostname: String, uuid: Array[Byte]) { @Override diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala index 5f9c0a0..b5ba15e 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala @@ -27,5 +27,5 @@ trait MetricMetadataProvider { * @param metricSourceDefinition component definition * @return */ - def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): (Map[MetricDefinition, Set[MetricKey]], Set[MetricKey]) + def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): Set[MetricKey] } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/MetricAnomalyInstance.scala similarity index 91% rename from ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala rename to ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/MetricAnomalyInstance.scala index 981a893..248a380 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/MetricAnomalyInstance.scala @@ -18,12 +18,15 @@ package org.apache.ambari.metrics.adservice.model +import javax.xml.bind.annotation.XmlRootElement + import org.apache.ambari.metrics.adservice.metadata.MetricKey import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType -abstract class SingleMetricAnomalyInstance { +@XmlRootElement +abstract class MetricAnomalyInstance { val metricKey: MetricKey val anomalyType: AnomalyType -} +} \ No newline at end of file diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala index 98ce0c4..db12307 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala @@ -19,17 +19,62 @@ package org.apache.ambari.metrics.adservice.resource import javax.ws.rs.core.MediaType.APPLICATION_JSON import javax.ws.rs.core.Response -import javax.ws.rs.{GET, Path, Produces} +import javax.ws.rs.{GET, Path, Produces, QueryParam} -import org.joda.time.DateTime +import org.apache.ambari.metrics.adservice.model.{AnomalyType, MetricAnomalyInstance} +import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType +import org.apache.ambari.metrics.adservice.service.ADQueryService +import org.apache.commons.lang.StringUtils + +import com.google.inject.Inject @Path("/anomaly") class AnomalyResource { + @Inject + var aDQueryService: ADQueryService = _ + @GET @Produces(Array(APPLICATION_JSON)) - def default: Response = { - Response.ok.entity(Map("message" -> "Anomaly Detection Service!", - "today" -> DateTime.now.toString("MM-dd-yyyy hh:mm"))).build() + def getTopNAnomalies(@QueryParam("type") anType: String, + @QueryParam("startTime") startTime: Long, + @QueryParam("endTime") endTime: Long, + @QueryParam("top") limit: Int): Response = { + + val anomalies: List[MetricAnomalyInstance] = aDQueryService.getTopNAnomaliesByType( + parseAnomalyType(anType), + parseStartTime(startTime), + parseEndTime(endTime), + parseTop(limit)) + + Response.ok.entity(anomalies).build() + } + + private def parseAnomalyType(anomalyType: String) : AnomalyType = { + if (StringUtils.isEmpty(anomalyType)) { + return AnomalyType.POINT_IN_TIME + } + AnomalyType.withName(anomalyType.toUpperCase) + } + + private def parseStartTime(startTime: Long) : Long = { + if (startTime > 0l) { + return startTime + } + System.currentTimeMillis() - 60*60*1000 + } + + private def parseEndTime(endTime: Long) : Long = { + if (endTime > 0l) { + return endTime + } + System.currentTimeMillis() + } + + private def parseTop(limit: Int) : Int = { + if (limit > 0) { + return limit + } + 5 } } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala index 16125fa..442bf46 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala @@ -17,10 +17,11 @@ package org.apache.ambari.metrics.adservice.resource -import javax.ws.rs.{GET, Path, Produces} +import javax.ws.rs._ import javax.ws.rs.core.MediaType.APPLICATION_JSON +import javax.ws.rs.core.Response -import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricSourceDefinition} +import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricKey, MetricSourceDefinition} import org.apache.commons.lang.StringUtils import com.google.inject.Inject @@ -33,8 +34,76 @@ class MetricDefinitionResource { @GET @Produces(Array(APPLICATION_JSON)) - def getMetricDefinition (definitionName: String) : MetricSourceDefinition = { - null + @Path("/{name}") + def defaultGet(@PathParam("name") definitionName: String): Response = { + + if (StringUtils.isEmpty(definitionName)) { + Response.ok.entity(Map("message" -> "Definition name cannot be empty. Use query parameter 'name'")).build() + } + val metricSourceDefinition = metricDefinitionService.getDefinitionByName(definitionName) + if (metricSourceDefinition != null) { + Response.ok.entity(metricSourceDefinition).build() + } else { + Response.ok.entity(Map("message" -> "Definition not found")).build() + } + } + + @GET + @Produces(Array(APPLICATION_JSON)) + def getAllMetricDefinitions: Response = { + val metricSourceDefinitionMap: List[MetricSourceDefinition] = metricDefinitionService.getDefinitions + Response.ok.entity(metricSourceDefinitionMap).build() + } + + @GET + @Path("/keys") + @Produces(Array(APPLICATION_JSON)) + def getMetricKeys: Response = { + val metricKeyMap: Map[String, Set[MetricKey]] = metricDefinitionService.getMetricKeys + Response.ok.entity(metricKeyMap).build() } + @POST + @Produces(Array(APPLICATION_JSON)) + def defaultPost(definition: MetricSourceDefinition) : Response = { + if (definition == null) { + Response.ok.entity(Map("message" -> "Definition content cannot be empty.")).build() + } + val success : Boolean = metricDefinitionService.addDefinition(definition) + if (success) { + Response.ok.entity(Map("message" -> "Definition saved")).build() + } else { + Response.ok.entity(Map("message" -> "Definition could not be saved")).build() + } + } + + @PUT + @Produces(Array(APPLICATION_JSON)) + def defaultPut(definition: MetricSourceDefinition) : Response = { + if (definition == null) { + Response.ok.entity(Map("message" -> "Definition content cannot be empty.")).build() + } + val success : Boolean = metricDefinitionService.updateDefinition(definition) + if (success) { + Response.ok.entity(Map("message" -> "Definition updated")).build() + } else { + Response.ok.entity(Map("message" -> "Definition could not be updated")).build() + } + } + + @DELETE + @Produces(Array(APPLICATION_JSON)) + @Path("/{name}") + def defaultDelete(@PathParam("name") definitionName: String): Response = { + + if (StringUtils.isEmpty(definitionName)) { + Response.ok.entity(Map("message" -> "Definition name cannot be empty. Use query parameter 'name'")).build() + } + val success: Boolean = metricDefinitionService.deleteDefinitionByName(definitionName) + if (success) { + Response.ok.entity(Map("message" -> "Definition deleted")).build() + } else { + Response.ok.entity(Map("message" -> "Definition could not be deleted")).build() + } + } } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala index 22fe0ac..fd55b64 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala @@ -17,6 +17,8 @@ */ package org.apache.ambari.metrics.adservice.resource +import java.time.LocalDateTime + import javax.ws.rs.core.MediaType.APPLICATION_JSON import javax.ws.rs.core.Response import javax.ws.rs.{GET, Path, Produces} @@ -29,7 +31,8 @@ class RootResource { @Produces(Array(APPLICATION_JSON)) @GET def default: Response = { - Response.ok.entity(Map("name" -> "anomaly-detection-service", "today" -> DateTime.now.toString("MM-dd-yyyy hh:mm"))).build() + val dtf = java.time.format.DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm") + Response.ok.entity(Map("name" -> "anomaly-detection-service", "today" -> LocalDateTime.now)).build() } } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala index 8e6f511..2cfa30f 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala @@ -18,9 +18,9 @@ package org.apache.ambari.metrics.adservice.service import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType -import org.apache.ambari.metrics.adservice.model.SingleMetricAnomalyInstance +import org.apache.ambari.metrics.adservice.model.MetricAnomalyInstance -trait ADQueryService { +trait ADQueryService extends AbstractADService{ /** * API to return list of single metric anomalies satisfying a set of conditions from the anomaly store. @@ -30,5 +30,5 @@ trait ADQueryService { * @param limit Maximim number of anomaly metrics that need to be returned based on anomaly score. * @return */ - def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[SingleMetricAnomalyInstance] + def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[MetricAnomalyInstance] } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala index e5efa44..3b49208 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala @@ -16,11 +16,30 @@ * limitations under the License. */ package org.apache.ambari.metrics.adservice.service +import org.apache.ambari.metrics.adservice.db.AdAnomalyStoreAccessor import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType -import org.apache.ambari.metrics.adservice.model.SingleMetricAnomalyInstance +import org.apache.ambari.metrics.adservice.model.MetricAnomalyInstance +import org.slf4j.{Logger, LoggerFactory} +import com.google.inject.{Inject, Singleton} + +@Singleton class ADQueryServiceImpl extends ADQueryService { + val LOG : Logger = LoggerFactory.getLogger(classOf[ADQueryServiceImpl]) + + @Inject + var adAnomalyStoreAccessor: AdAnomalyStoreAccessor = _ + + /** + * Initialize Service + */ + override def initialize(): Unit = { + LOG.info("Initializing AD Query Service...") + adAnomalyStoreAccessor.initialize() + LOG.info("Successfully initialized AD Query Service.") + } + /** * Implementation to return list of anomalies satisfying a set of conditions from the anomaly store. * @@ -30,8 +49,8 @@ class ADQueryServiceImpl extends ADQueryService { * @param limit Maximim number of anomaly metrics that need to be returned based on anomaly score. * @return */ - override def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[SingleMetricAnomalyInstance] = { - val anomalies = List.empty[SingleMetricAnomalyInstance] + override def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[MetricAnomalyInstance] = { + val anomalies = adAnomalyStoreAccessor.getMetricAnomalies(anomalyType, startTime, endTime, limit) anomalies } } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/AbstractADService.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/AbstractADService.scala new file mode 100644 index 0000000..56bb999 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/AbstractADService.scala @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *//** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.service + +trait AbstractADService { + + /** + * Initialize Service + */ + def initialize(): Unit + +} diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala index 63cf8c7..56ca2c1 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala @@ -23,7 +23,7 @@ import org.apache.ambari.metrics.adservice.common.Season import org.apache.ambari.metrics.adservice.metadata.MetricKey import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType -import org.apache.ambari.metrics.adservice.model.{AnomalyType, SingleMetricAnomalyInstance} +import org.apache.ambari.metrics.adservice.model.{AnomalyType, MetricAnomalyInstance} class PointInTimeAnomalyInstance(val metricKey: MetricKey, val timestamp: Long, @@ -31,7 +31,7 @@ class PointInTimeAnomalyInstance(val metricKey: MetricKey, val methodType: AnomalyDetectionMethod, val anomalyScore: Double, val anomalousSeason: Season, - val modelParameters: String) extends SingleMetricAnomalyInstance { + val modelParameters: String) extends MetricAnomalyInstance { override val anomalyType: AnomalyType = AnomalyType.POINT_IN_TIME diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala index 3fc0d6f..7392d59 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala @@ -21,7 +21,7 @@ import org.apache.ambari.metrics.adservice.common.{Season, TimeRange} import org.apache.ambari.metrics.adservice.metadata.MetricKey import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType -import org.apache.ambari.metrics.adservice.model.{AnomalyType, SingleMetricAnomalyInstance} +import org.apache.ambari.metrics.adservice.model.{AnomalyType, MetricAnomalyInstance} case class TrendAnomalyInstance (metricKey: MetricKey, anomalousPeriod: TimeRange, @@ -29,7 +29,7 @@ case class TrendAnomalyInstance (metricKey: MetricKey, methodType: AnomalyDetectionMethod, anomalyScore: Double, seasonInfo: Season, - modelParameters: String) extends SingleMetricAnomalyInstance { + modelParameters: String) extends MetricAnomalyInstance { override val anomalyType: AnomalyType = AnomalyType.POINT_IN_TIME diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala index 2a4999c..e38ea40 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala @@ -17,6 +17,8 @@ */ package org.apache.ambari.metrics.adservice.app +import java.time.LocalDateTime + import javax.ws.rs.client.Client import javax.ws.rs.core.MediaType.APPLICATION_JSON @@ -37,7 +39,8 @@ class DefaultADResourceSpecTest extends FunSpec with Matchers { withAppRunning(classOf[AnomalyDetectionApp], Resources.getResource("config.yml").getPath) { rule => val json = client.target(s"http://localhost:${rule.getLocalPort}/anomaly") .request().accept(APPLICATION_JSON).buildGet().invoke(classOf[String]) - val now = DateTime.now.toString("MM-dd-yyyy hh:mm") + val dtf = java.time.format.DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm") + val now = LocalDateTime.now assert(json == "{\"message\":\"Anomaly Detection Service!\"," + "\"today\":\"" + now + "\"}") } } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala index bd38e9a..79366b1 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala @@ -18,26 +18,32 @@ package org.apache.ambari.metrics.adservice.metadata +import java.util + import org.apache.ambari.metrics.adservice.configuration.MetricCollectorConfiguration -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey import org.scalatest.FunSuite class AMSMetadataProviderTest extends FunSuite { test("testFromTimelineMetricKey") { - val timelineMetricKeys: java.util.Set[TimelineMetricKey] = new java.util.HashSet[TimelineMetricKey]() + val timelineMetricKeys: java.util.Set[java.util.Map[String, String]] = new java.util.HashSet[java.util.Map[String, String]]() val uuid: Array[Byte] = Array.empty[Byte] for (i <- 1 to 3) { - val key: TimelineMetricKey = new TimelineMetricKey("M" + i, "App", null, "H", uuid) - timelineMetricKeys.add(key) + val keyMap: java.util.Map[String, String] = new util.HashMap[String, String]() + keyMap.put("metricName", "M" + i) + keyMap.put("appId", "App") + keyMap.put("hostname", "H") + keyMap.put("uuid", new String(uuid)) + timelineMetricKeys.add(keyMap) } val aMSMetadataProvider : ADMetadataProvider = new ADMetadataProvider(new MetricCollectorConfiguration) - val metricKeys : Set[MetricKey] = aMSMetadataProvider.fromTimelineMetricKey(timelineMetricKeys) + val metricKeys : Set[MetricKey] = aMSMetadataProvider.getMetricKeys(timelineMetricKeys) assert(metricKeys.size == 3) } + } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala index 0149673..c4d4dbc 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala @@ -20,6 +20,10 @@ package org.apache.ambari.metrics.adservice.metadata import org.apache.commons.lang.SerializationUtils import org.scalatest.FunSuite +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.ambari.metrics.adservice.app.ADServiceScalaModule + class MetricSourceDefinitionTest extends FunSuite { test("createNewMetricSourceDefinition") { @@ -65,7 +69,12 @@ class MetricSourceDefinitionTest extends FunSuite { } test("serializeDeserialize") { - val msd : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "testAppId", MetricSourceDefinitionType.API) + + val msd : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "A1", MetricSourceDefinitionType.API) + msd.hosts = List("h1") + msd.addMetricDefinition(MetricDefinition("M1", null, List("h2"))) + msd.addMetricDefinition(MetricDefinition("M1", "A2", null)) + val msdByteArray: Array[Byte] = SerializationUtils.serialize(msd) assert(msdByteArray.nonEmpty) @@ -73,5 +82,10 @@ class MetricSourceDefinitionTest extends FunSuite { assert(msd2 != null) assert(msd == msd2) + val mapper : ObjectMapper = new ObjectMapper() + mapper.registerModule(new ADServiceScalaModule) + + System.out.print(mapper.writeValueAsString(msd)) + } } diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml index 34bf5cb..af68ed9 100644 --- a/ambari-metrics/ambari-metrics-common/pom.xml +++ b/ambari-metrics/ambari-metrics-common/pom.xml @@ -27,12 +27,6 @@ <artifactId>ambari-metrics-common</artifactId> <name>Ambari Metrics Common</name> - <properties> - <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version> - <hbase.version>1.1.2.2.6.0.3-8</hbase.version> - <phoenix.version>4.7.0.2.6.0.3-8</phoenix.version> - </properties> - <build> <plugins> <plugin> @@ -143,45 +137,6 @@ <dependencies> <dependency> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-core</artifactId> - <version>${phoenix.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-annotations</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>commons-el</groupId> - <artifactId>commons-el</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1-jetty</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>net.sf.ehcache</groupId> <artifactId>ehcache</artifactId> <version>2.10.0</version> diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java deleted file mode 100644 index 7619811..0000000 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metrics2.sink.timeline; - -import org.apache.commons.lang.StringUtils; - -public class TimelineMetricKey { - public String metricName; - public String appId; - public String instanceId = null; - public String hostName; - public byte[] uuid; - - public TimelineMetricKey(String metricName, String appId, String instanceId, String hostName, byte[] uuid) { - this.metricName = metricName; - this.appId = appId; - this.instanceId = instanceId; - this.hostName = hostName; - this.uuid = uuid; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TimelineMetricKey that = (TimelineMetricKey) o; - - if (!metricName.equals(that.metricName)) return false; - if (!appId.equals(that.appId)) return false; - if (!hostName.equals(that.hostName)) return false; - return (StringUtils.isNotEmpty(instanceId) ? instanceId.equals(that.instanceId) : StringUtils.isEmpty(that.instanceId)); - } - - @Override - public int hashCode() { - int result = metricName.hashCode(); - result = 31 * result + (appId != null ? appId.hashCode() : 0); - result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); - result = 31 * result + (hostName != null ? hostName.hashCode() : 0); - return result; - } - -} diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java index a96be30..c2e9448 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java @@ -44,6 +44,7 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -54,7 +55,6 @@ import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; @@ -496,30 +496,44 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time * @param metricName * @param appId * @param instanceId - * @param hostname + * @param hosts * @return * @throws SQLException * @throws IOException */ @Override - public Set<TimelineMetricKey> getTimelineMetricKey(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException { + public Set<Map<String, String>> getTimelineMetricKeys(String metricName, String appId, String instanceId, List<String> hosts) + throws SQLException, IOException { + Set<Map<String, String>> timelineMetricKeys = new HashSet<>(); - if (StringUtils.isEmpty(hostname)) { - Set<String> hosts = new HashSet<>(); + if (CollectionUtils.isEmpty(hosts)) { + Set<String> hostsFromMetadata = new HashSet<>(); for (String host : metricMetadataManager.getHostedAppsCache().keySet()) { if (metricMetadataManager.getHostedAppsCache().get(host).getHostedApps().contains(appId)) { - hosts.add(host); + hostsFromMetadata.add(host); } } - Set<TimelineMetricKey> timelineMetricKeys = new HashSet<>(); - for (String host : hosts) { + for (String host : hostsFromMetadata) { byte[] uuid = metricMetadataManager.getUuid(metricName, appId, instanceId, host); - timelineMetricKeys.add(new TimelineMetricKey(metricName, appId, instanceId, host, uuid)); + Map<String, String> keyMap = new HashMap<>(); + keyMap.put("metricName", metricName); + keyMap.put("appId", appId); + keyMap.put("hostname", host); + keyMap.put("uuid", new String(uuid)); + timelineMetricKeys.add(keyMap); } return timelineMetricKeys; } else { - byte[] uuid = metricMetadataManager.getUuid(metricName, appId, instanceId, hostname); - return Collections.singleton(new TimelineMetricKey(metricName, appId, instanceId, hostname, uuid)); + for (String host : hosts) { + byte[] uuid = metricMetadataManager.getUuid(metricName, appId, instanceId, host); + Map<String, String> keyMap = new HashMap<>(); + keyMap.put("metricName", metricName); + keyMap.put("appId", appId); + keyMap.put("hostname", host); + keyMap.put("uuid", new String(uuid)); + timelineMetricKeys.add(keyMap); + } + return timelineMetricKeys; } } diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 65b4614..0626e8e 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -120,6 +120,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; @@ -139,8 +140,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.metrics2.sink.timeline.query.DefaultPhoenixDataSource; -import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink; @@ -210,7 +211,7 @@ public class PhoenixHBaseAccessor { private HashMap<String, String> tableTTL = new HashMap<>(); private final TimelineMetricConfiguration configuration; - private List<InternalMetricsSource> rawMetricsSources; + private List<InternalMetricsSource> rawMetricsSources = new ArrayList<>(); public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) { this(TimelineMetricConfiguration.getInstance(), dataSource); @@ -459,6 +460,23 @@ public class PhoenixHBaseAccessor { return mapper.readValue(json, metricValuesTypeRef); } + private Connection getConnectionRetryingOnException() + throws SQLException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try{ + return getConnection(); + } catch (SQLException e) { + if(!retryCounter.shouldRetry()){ + LOG.error("HBaseAccessor getConnection failed after " + + retryCounter.getMaxAttempts() + " attempts"); + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } + /** * Get JDBC connection to HBase store. Assumption is that the hbase * configuration is present on the classpath and loaded by the caller into @@ -491,7 +509,7 @@ public class PhoenixHBaseAccessor { try { LOG.info("Initializing metrics schema..."); - conn = dataSource.getConnectionRetryingOnException(retryCounterFactory); + conn = getConnectionRetryingOnException(); stmt = conn.createStatement(); // Metadata diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java index f00bd91..349ef83 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java @@ -21,7 +21,6 @@ import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; @@ -111,6 +110,6 @@ public interface TimelineMetricStore { TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) throws SQLException; - Set<TimelineMetricKey> getTimelineMetricKey(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException; + Set<Map<String, String>> getTimelineMetricKeys(String metricName, String appId, String instanceId, List<String> hosts) throws SQLException, IOException; } diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java similarity index 84% rename from ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java rename to ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java index 72e5fb5..391af27 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.metrics2.sink.timeline.query; +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; import org.apache.hadoop.hbase.util.RetryCounterFactory; @@ -28,5 +28,4 @@ import java.sql.SQLException; */ public interface ConnectionProvider { public Connection getConnection() throws SQLException; - public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) throws SQLException, InterruptedException; } diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java similarity index 84% rename from ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java rename to ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java index a28a433..67afe6b 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.metrics2.sink.timeline.query; +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; import org.apache.commons.logging.Log; @@ -89,20 +89,4 @@ public class DefaultPhoenixDataSource implements PhoenixConnectionProvider { } } - public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) - throws SQLException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try{ - return getConnection(); - } catch (SQLException e) { - if(!retryCounter.shouldRetry()){ - LOG.error("HBaseAccessor getConnection failed after " - + retryCounter.getMaxAttempts() + " attempts"); - throw e; - } - } - retryCounter.sleepUntilNextRetry(); - } - } } diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java similarity index 92% rename from ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java rename to ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java index 194c769..cacbcfb 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java @@ -1,4 +1,4 @@ -package org.apache.hadoop.metrics2.sink.timeline.query; +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; import org.apache.hadoop.hbase.client.HBaseAdmin; diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java index db35686..dc401e6 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; import com.google.inject.Inject; import com.google.inject.Singleton; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -27,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -69,7 +69,9 @@ import javax.xml.bind.annotation.XmlRootElement; import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; import java.util.List; @@ -516,7 +518,7 @@ public class TimelineWebServices { @GET @Path("/metrics/metadata/key") @Produces({ MediaType.APPLICATION_JSON }) - public Set<TimelineMetricKey> getTimelineMetricKey( + public Set<Map<String, String>> getTimelineMetricKey( @Context HttpServletRequest req, @Context HttpServletResponse res, @QueryParam("metricName") String metricName, @@ -527,7 +529,11 @@ public class TimelineWebServices { init(res); try { - return timelineMetricStore.getTimelineMetricKey(metricName, appId, instanceId, hostname); + if (StringUtils.isEmpty(hostname)) { + return timelineMetricStore.getTimelineMetricKeys(metricName, appId, instanceId, Collections.EMPTY_LIST); + } else { + return timelineMetricStore.getTimelineMetricKeys(metricName, appId, instanceId, Arrays.asList(StringUtils.split(hostname, ","))); + } } catch (Exception e) { throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java index 7b70a80..03205e7 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; -import org.apache.hadoop.metrics2.sink.timeline.query.DefaultPhoenixDataSource; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource; import org.apache.zookeeper.ClientCnxn; import org.easymock.EasyMock; import org.junit.After; diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java index 9c55305..741bb3c 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java @@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils; -import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -224,17 +224,6 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest { return connection; } - @Override - public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) throws SQLException, InterruptedException { - Connection connection = null; - try { - connection = DriverManager.getConnection(getUrl()); - } catch (SQLException e) { - LOG.warn("Unable to connect to HBase store using Phoenix.", e); - } - return connection; - } - }); } diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java index 5d81faa..50ff656 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java @@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; -import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import org.apache.phoenix.exception.PhoenixIOException; import org.easymock.EasyMock; @@ -96,10 +96,6 @@ public class PhoenixHBaseAccessorTest { return null; } - @Override - public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) throws SQLException, InterruptedException { - return null; - } }; accessor = new PhoenixHBaseAccessor(connectionProvider); @@ -256,11 +252,6 @@ public class PhoenixHBaseAccessorTest { public Connection getConnection() throws SQLException { return connection; } - - @Override - public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) throws SQLException, InterruptedException { - return connection; - } }; accessor = new PhoenixHBaseAccessor(connectionProvider); diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java index 42175a7..de24c68 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java @@ -21,7 +21,6 @@ import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; @@ -127,7 +126,7 @@ public class TestTimelineMetricStore implements TimelineMetricStore { } @Override - public Set<TimelineMetricKey> getTimelineMetricKey(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException { + public Set<Map<String, String>> getTimelineMetricKeys(String metricName, String appId, String instanceId, List<String> hosts) throws SQLException, IOException { return Collections.emptySet(); } -- To stop receiving notification emails like this one, please contact [email protected].
