http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerService.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerService.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerService.scala new file mode 100644 index 0000000..12bd7e4 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerService.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.metadata + +trait MetricManagerService { + + /** + * Given a 'UUID', return the metric key associated with it. + * @param uuid UUID + * @return + */ + def getMetricKeyFromUuid(uuid: Array[Byte]) : MetricKey + + /** + * Given a component definition name, return the definition associated with it. + * @param name component definition name + * @return + */ + def getDefinitionByName(name: String) : MetricSourceDefinition + + /** + * Add a new definition. + * @param definition component definition JSON + * @return + */ + def addDefinition(definition: MetricSourceDefinition) : Boolean + + /** + * Update a component definition by name. Only definitions which were added by API can be modified through API. + * @param definition component definition name + * @return + */ + def updateDefinition(definition: MetricSourceDefinition) : Boolean + + /** + * Delete a component definition by name. Only definitions which were added by API can be deleted through API. + * @param name component definition name + * @return + */ + def deleteDefinitionByName(name: String) : Boolean + + /** + * Given an appId, return set of definitions that are tracked for that appId. + * @param appId component definition appId + * @return + */ + def getDefinitionByAppId(appId: String) : List[MetricSourceDefinition] + +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerServiceImpl.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerServiceImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerServiceImpl.scala new file mode 100644 index 0000000..ce02775 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerServiceImpl.scala @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.metadata + +import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig +import org.apache.ambari.metrics.adservice.db.AdMetadataStoreAccessor + +import com.google.inject.{Inject, Singleton} + +@Singleton +class MetricManagerServiceImpl extends MetricManagerService { + + @Inject + var adMetadataStoreAccessor: AdMetadataStoreAccessor = _ + + var configuration: AnomalyDetectionAppConfig = _ + var metricMetadataProvider: MetricMetadataProvider = _ + + var metricSourceDefinitionMap: Map[String, MetricSourceDefinition] = Map() + var metricKeys: Set[MetricKey] = Set.empty[MetricKey] + var metricDefinitionMetricKeyMap: Map[MetricDefinition, Set[MetricKey]] = Map() + + @Inject + def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig) = { + this () + //TODO : Create AD Metadata instance here (or inject) + configuration = anomalyDetectionAppConfig + initializeService() + } + + def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig, adMetadataStoreAccessor: AdMetadataStoreAccessor) = { + this () + //TODO : Create AD Metadata instance here (or inject). Pass in Schema information. + configuration = anomalyDetectionAppConfig + this.adMetadataStoreAccessor = adMetadataStoreAccessor + initializeService() + } + + def initializeService() : Unit = { + + //Create AD Metadata Schema + //TODO Make sure AD Metadata DB is initialized here. + + //Initialize Metric Metadata Provider + metricMetadataProvider = new ADMetadataProvider(configuration.getMetricCollectorConfiguration) + + loadMetricSourceDefinitions() + } + + def loadMetricSourceDefinitions() : Unit = { + + //Load definitions from metadata store + val definitionsFromStore: List[MetricSourceDefinition] = adMetadataStoreAccessor.getSavedInputDefinitions + + //Load definitions from configs + val definitionsFromConfig: List[MetricSourceDefinition] = getInputDefinitionsFromConfig + + //Union the 2 sources, with DB taking precedence. + //Save new definition list to DB. + metricSourceDefinitionMap = metricSourceDefinitionMap.++(combineDefinitionSources(definitionsFromConfig, definitionsFromStore)) + + //Reach out to AMS Metadata and get Metric Keys. Pass in List<CD> and get back Map<MD,Set<MK>> + for (definition <- metricSourceDefinitionMap.values) { + val (definitionKeyMap: Map[MetricDefinition, Set[MetricKey]], keys: Set[MetricKey])= metricMetadataProvider.getMetricKeysForDefinitions(definition) + metricDefinitionMetricKeyMap = metricDefinitionMetricKeyMap.++(definitionKeyMap) + metricKeys = metricKeys.++(keys) + } + } + + def getMetricKeyFromUuid(uuid: Array[Byte]): MetricKey = { + var key: MetricKey = null + for (metricKey <- metricKeys) { + if (metricKey.uuid.sameElements(uuid)) { + key = metricKey + } + } + key + } + + @Override + def getDefinitionByName(name: String): MetricSourceDefinition = { + metricSourceDefinitionMap.apply(name) + } + + @Override + def addDefinition(definition: MetricSourceDefinition): Boolean = { + if (metricSourceDefinitionMap.contains(definition.definitionName)) { + return false + } + definition.definitionSource = MetricSourceDefinitionType.API + + val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition) + if (success) { + metricSourceDefinitionMap += definition.definitionName -> definition + } + success + } + + @Override + def updateDefinition(definition: MetricSourceDefinition): Boolean = { + if (!metricSourceDefinitionMap.contains(definition.definitionName)) { + return false + } + + if (metricSourceDefinitionMap.apply(definition.definitionName).definitionSource != MetricSourceDefinitionType.API) { + return false + } + + val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition) + if (success) { + metricSourceDefinitionMap += definition.definitionName -> definition + } + success + } + + @Override + def deleteDefinitionByName(name: String): Boolean = { + if (!metricSourceDefinitionMap.contains(name)) { + return false + } + + val definition : MetricSourceDefinition = metricSourceDefinitionMap.apply(name) + if (definition.definitionSource != MetricSourceDefinitionType.API) { + return false + } + + val success: Boolean = adMetadataStoreAccessor.removeInputDefinition(name) + if (success) { + metricSourceDefinitionMap += definition.definitionName -> definition + } + success + } + + @Override + def getDefinitionByAppId(appId: String): List[MetricSourceDefinition] = { + + val defList : List[MetricSourceDefinition] = metricSourceDefinitionMap.values.toList + defList.filter(_.appId == appId) + } + + def combineDefinitionSources(configDefinitions: List[MetricSourceDefinition], dbDefinitions: List[MetricSourceDefinition]) + : Map[String, MetricSourceDefinition] = { + + var combinedDefinitionMap: scala.collection.mutable.Map[String, MetricSourceDefinition] = + scala.collection.mutable.Map.empty[String, MetricSourceDefinition] + + for (definitionFromDb <- dbDefinitions) { + combinedDefinitionMap(definitionFromDb.definitionName) = definitionFromDb + } + + for (definition <- configDefinitions) { + if (!dbDefinitions.contains(definition)) { + adMetadataStoreAccessor.saveInputDefinition(definition) + combinedDefinitionMap(definition.definitionName) = definition + } + } + combinedDefinitionMap.toMap + } + + def getInputDefinitionsFromConfig: List[MetricSourceDefinition] = { + val configDirectory = configuration.getMetricManagerServiceConfiguration.getInputDefinitionDirectory + InputMetricDefinitionParser.parseInputDefinitionsFromDirectory(configDirectory) + } + + def setAdMetadataStoreAccessor (adMetadataStoreAccessor: AdMetadataStoreAccessor) : Unit = { + this.adMetadataStoreAccessor = adMetadataStoreAccessor + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala new file mode 100644 index 0000000..5f9c0a0 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.metadata + +/** + * Metadata provider for maintaining the metric information in the Metric Definition Service. + */ +trait MetricMetadataProvider { + + /** + * Return the set of Metric Keys for a given component definition. + * @param metricSourceDefinition component definition + * @return + */ + def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): (Map[MetricDefinition, Set[MetricKey]], Set[MetricKey]) +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala new file mode 100644 index 0000000..60198e0 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinition.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.metadata + +import javax.xml.bind.annotation.XmlRootElement + +import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinitionType.MetricSourceDefinitionType +import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper + +/* +{ + "definition-name": "host-memory", + "app-id" : "HOST", + "hosts" : [âc6401.ambari.apache.orgâ], + "metric-definitions" : [ + { + "metric-name": "mem_free", + "metric-description" : "Free memory on a Host.", + "troubleshooting-info" : "Sudden drop / hike in free memory on a host.", + "static-threshold" : 10, + âapp-idâ : âHOSTâ +} ], + + "related-definition-names" : ["host-cpu", âhost-networkâ], + âanomaly-detection-subsystemsâ : [âpoint-in-timeâ, âtrendâ] +} +*/ + +/* + +On Startup +Read input definitions directory, parse the JSONs +Create / Update the metric definitions in DB +Convert metric definitions to Map<MetricKey, MetricDefinition> + +What to do want to have in memory? +Map of Metric Key -> List<Component Definitions> + +What do we use metric definitions for? +Anomaly GET - Associate definition information as well. +Definition CRUD - Get definition given definition name +Get set of metrics that are being tracked +Return definition information for a metric key +Given a metric definition name, return set of metrics. + +*/ + +@XmlRootElement +class MetricSourceDefinition { + + var definitionName: String = _ + var appId: String = _ + var definitionSource: MetricSourceDefinitionType = MetricSourceDefinitionType.CONFIG + var hosts: List[String] = List.empty[String] + var relatedDefinitions: List[String] = List.empty[String] + var associatedAnomalySubsystems: List[AnomalyType] = List.empty[AnomalyType] + + var metricDefinitions: scala.collection.mutable.MutableList[MetricDefinition] = + scala.collection.mutable.MutableList.empty[MetricDefinition] + + def this(definitionName: String, appId: String, source: MetricSourceDefinitionType) = { + this + this.definitionName = definitionName + this.appId = appId + this.definitionSource = source + } + + def addMetricDefinition(metricDefinition: MetricDefinition): Unit = { + if (!metricDefinitions.contains(metricDefinition)) { + metricDefinitions.+=(metricDefinition) + } + } + + def removeMetricDefinition(metricDefinition: MetricDefinition): Unit = { + metricDefinitions = metricDefinitions.filter(_ != metricDefinition) + } + + @Override + override def equals(obj: scala.Any): Boolean = { + + if (obj == null) { + return false + } + val that = obj.asInstanceOf[MetricSourceDefinition] + definitionName.equals(that.definitionName) + } +} + +object MetricSourceDefinition { + val mapper = new ObjectMapper() with ScalaObjectMapper + mapper.registerModule(DefaultScalaModule) + + def serialize(definition: MetricSourceDefinition) : String = { + mapper.writeValueAsString(definition) + } + + def deserialize(definitionString: String) : MetricSourceDefinition = { + mapper.readValue[MetricSourceDefinition](definitionString) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionType.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionType.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionType.scala new file mode 100644 index 0000000..04ff95b --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionType.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.metadata + +import javax.xml.bind.annotation.XmlRootElement + +@XmlRootElement +object MetricSourceDefinitionType extends Enumeration{ + type MetricSourceDefinitionType = Value + val CONFIG,API = Value +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/AnomalyDetectionMethod.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/AnomalyDetectionMethod.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/AnomalyDetectionMethod.scala new file mode 100644 index 0000000..81a7023 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/AnomalyDetectionMethod.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.model + +object AnomalyDetectionMethod extends Enumeration { + type AnomalyDetectionMethod = Value + val EMA, TUKEYS, KS, HSDEV, UNKOWN = Value +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/AnomalyType.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/AnomalyType.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/AnomalyType.scala new file mode 100644 index 0000000..817180e --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/AnomalyType.scala @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.model + +import javax.xml.bind.annotation.XmlRootElement + +@XmlRootElement +object AnomalyType extends Enumeration { + type AnomalyType = Value + val POINT_IN_TIME, TREND, UNKNOWN = Value +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala new file mode 100644 index 0000000..981a893 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.model + +import org.apache.ambari.metrics.adservice.metadata.MetricKey +import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType + +abstract class SingleMetricAnomalyInstance { + + val metricKey: MetricKey + val anomalyType: AnomalyType + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala index fb9921a..c941ac3 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala @@ -17,9 +17,9 @@ */ package org.apache.ambari.metrics.adservice.resource -import javax.ws.rs.{GET, Path, Produces} -import javax.ws.rs.core.Response import javax.ws.rs.core.MediaType.APPLICATION_JSON +import javax.ws.rs.core.Response +import javax.ws.rs.{GET, Path, Produces} import org.joda.time.DateTime http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala new file mode 100644 index 0000000..aacea79 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.resource + +class MetricDefinitionResource { + + /* + GET component definition + POST component definition + DELETE component definition + PUT component definition + */ +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala index b92a145..22fe0ac 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala @@ -17,9 +17,9 @@ */ package org.apache.ambari.metrics.adservice.resource -import javax.ws.rs.{GET, Path, Produces} -import javax.ws.rs.core.Response import javax.ws.rs.core.MediaType.APPLICATION_JSON +import javax.ws.rs.core.Response +import javax.ws.rs.{GET, Path, Produces} import org.joda.time.DateTime http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/SubsystemResource.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/SubsystemResource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/SubsystemResource.scala new file mode 100644 index 0000000..e7d7c9a --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/SubsystemResource.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.resource + +class SubsystemResource { + + /* + GET / UPDATE - parameters (which subsystem, parameters) + POST - Update sensitivity of a subsystem (which subsystem, increase or decrease, factor) + */ +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala index 0161166..8e6f511 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala @@ -17,6 +17,18 @@ */ package org.apache.ambari.metrics.adservice.service +import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType +import org.apache.ambari.metrics.adservice.model.SingleMetricAnomalyInstance + trait ADQueryService { + /** + * API to return list of single metric anomalies satisfying a set of conditions from the anomaly store. + * @param anomalyType Type of the anomaly (Point In Time / Trend) + * @param startTime Start of time range + * @param endTime End of time range + * @param limit Maximim number of anomaly metrics that need to be returned based on anomaly score. + * @return + */ + def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[SingleMetricAnomalyInstance] } http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala index fe00f58..e5efa44 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala @@ -16,7 +16,22 @@ * limitations under the License. */ package org.apache.ambari.metrics.adservice.service +import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType +import org.apache.ambari.metrics.adservice.model.SingleMetricAnomalyInstance class ADQueryServiceImpl extends ADQueryService { + /** + * Implementation to return list of anomalies satisfying a set of conditions from the anomaly store. + * + * @param anomalyType Type of the anomaly (Point In Time / Trend) + * @param startTime Start of time range + * @param endTime End of time range + * @param limit Maximim number of anomaly metrics that need to be returned based on anomaly score. + * @return + */ + override def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[SingleMetricAnomalyInstance] = { + val anomalies = List.empty[SingleMetricAnomalyInstance] + anomalies + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala index 6122f5e..90c564e 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala @@ -16,22 +16,6 @@ */ package org.apache.ambari.metrics.adservice.spark.prototype -import java.io.{FileInputStream, IOException, InputStream} -import java.util -import java.util.Properties -import java.util.logging.LogManager - -import com.fasterxml.jackson.databind.ObjectMapper -import org.apache.ambari.metrics.adservice.prototype.core.MetricsCollectorInterface -import org.apache.spark.SparkConf -import org.apache.spark.streaming._ -import org.apache.spark.streaming.kafka._ -import org.apache.ambari.metrics.adservice.prototype.methods.{AnomalyDetectionTechnique, MetricAnomaly} -import org.apache.ambari.metrics.adservice.prototype.methods.ema.{EmaModelLoader, EmaTechnique} -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics -import org.apache.log4j.Logger -import org.apache.spark.storage.StorageLevel - object MetricAnomalyDetector { /* http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala index ac00764..466225f 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala @@ -17,11 +17,6 @@ package org.apache.ambari.metrics.adservice.spark.prototype -import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric -import org.apache.spark.sql.SQLContext -import org.apache.spark.{SparkConf, SparkContext} - object SparkPhoenixReader { def main(args: Array[String]) { http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala new file mode 100644 index 0000000..63cf8c7 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.subsystem.pointintime + +import java.util.Date + +import org.apache.ambari.metrics.adservice.common.Season +import org.apache.ambari.metrics.adservice.metadata.MetricKey +import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod +import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType +import org.apache.ambari.metrics.adservice.model.{AnomalyType, SingleMetricAnomalyInstance} + +class PointInTimeAnomalyInstance(val metricKey: MetricKey, + val timestamp: Long, + val metricValue: Double, + val methodType: AnomalyDetectionMethod, + val anomalyScore: Double, + val anomalousSeason: Season, + val modelParameters: String) extends SingleMetricAnomalyInstance { + + override val anomalyType: AnomalyType = AnomalyType.POINT_IN_TIME + + private def anomalyToString : String = { + "Method=" + methodType + ", AnomalyScore=" + anomalyScore + ", Season=" + anomalousSeason.toString + + ", Model Parameters=" + modelParameters + } + + @Override + override def toString: String = { + "Metric : [" + metricKey.toString + ", Metric Value=" + metricValue + " @ Time = " + new Date(timestamp) + "], Anomaly : [" + anomalyToString + "]" + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala new file mode 100644 index 0000000..125da34 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala @@ -0,0 +1,29 @@ +package org.apache.ambari.metrics.adservice.subsystem.trend + +import org.apache.ambari.metrics.adservice.common.{Season, TimeRange} +import org.apache.ambari.metrics.adservice.metadata.MetricKey +import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod +import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType +import org.apache.ambari.metrics.adservice.model.{AnomalyType, SingleMetricAnomalyInstance} + +case class TrendAnomalyInstance (metricKey: MetricKey, + anomalousPeriod: TimeRange, + referencePeriod: TimeRange, + methodType: AnomalyDetectionMethod, + anomalyScore: Double, + seasonInfo: Season, + modelParameters: String) extends SingleMetricAnomalyInstance { + + override val anomalyType: AnomalyType = AnomalyType.POINT_IN_TIME + + private def anomalyToString : String = { + "Method=" + methodType + ", AnomalyScore=" + anomalyScore + ", Season=" + anomalousPeriod.toString + + ", Model Parameters=" + modelParameters + } + + @Override + override def toString: String = { + "Metric : [" + metricKey.toString + ", AnomalousPeriod=" + anomalousPeriod + ", ReferencePeriod=" + referencePeriod + + "], Anomaly : [" + anomalyToString + "]" + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestTukeys.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestTukeys.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestTukeys.java index 57a6f34..1077a9c 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestTukeys.java +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestTukeys.java @@ -17,9 +17,9 @@ */ package org.apache.ambari.metrics.adservice.prototype; -import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; import org.apache.ambari.metrics.adservice.prototype.core.MetricsCollectorInterface; import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker; +import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala new file mode 100644 index 0000000..8e3a949 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.app + +import java.io.File + +import javax.validation.Validator + +import org.scalatest.FunSuite + +import com.fasterxml.jackson.databind.ObjectMapper + +import io.dropwizard.configuration.YamlConfigurationFactory +import io.dropwizard.jackson.Jackson +import io.dropwizard.jersey.validation.Validators + +class AnomalyDetectionAppConfigTest extends FunSuite { + + test("testConfiguration") { + + val objectMapper: ObjectMapper = Jackson.newObjectMapper() + val validator: Validator = Validators.newValidator + val factory: YamlConfigurationFactory[AnomalyDetectionAppConfig] = + new YamlConfigurationFactory[AnomalyDetectionAppConfig](classOf[AnomalyDetectionAppConfig], validator, objectMapper, "") + + val classLoader = getClass.getClassLoader + val file = new File(classLoader.getResource("config.yml").getFile) + val config = factory.build(file) + + assert(config.isInstanceOf[AnomalyDetectionAppConfig]) + + assert(config.getMetricManagerServiceConfiguration.getInputDefinitionDirectory == "/etc/adservice/conf/input-definitions-directory") + + assert(config.getMetricCollectorConfiguration.getHostPortList == "host1:6188,host2:6188") + + assert(config.getAdServiceConfiguration.getAnomalyDataTtl == 604800) + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala index c088855..65cf609 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala @@ -21,8 +21,8 @@ import javax.ws.rs.client.Client import javax.ws.rs.core.MediaType.APPLICATION_JSON import org.apache.ambari.metrics.adservice.app.DropwizardAppRuleHelper.withAppRunning -import org.glassfish.jersey.client.{ClientConfig, JerseyClientBuilder} import org.glassfish.jersey.client.ClientProperties.{CONNECT_TIMEOUT, READ_TIMEOUT} +import org.glassfish.jersey.client.{ClientConfig, JerseyClientBuilder} import org.glassfish.jersey.filter.LoggingFilter import org.glassfish.jersey.jaxb.internal.XmlJaxbElementProvider import org.joda.time.DateTime http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala deleted file mode 100644 index 40b9d6a..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.adservice.common - -import org.scalatest.FlatSpec - -import scala.collection.mutable - -class ADServiceConfigurationTest extends FlatSpec { - - "A Stack" should "pop values in last-in-first-out order" in { - val stack = new mutable.Stack[Int] - stack.push(1) - stack.push(2) - assert(stack.pop() === 2) - assert(stack.pop() === 1) - } - - it should "throw NoSuchElementException if an empty stack is popped" in { - val emptyStack = new mutable.Stack[String] - assertThrows[NoSuchElementException] { - emptyStack.pop() - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/RangeTest.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/RangeTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/RangeTest.scala new file mode 100644 index 0000000..b610b97 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/RangeTest.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.common + +import org.scalatest.FlatSpec + +class RangeTest extends FlatSpec { + + "A Range " should " return true for inner and boundary values" in { + val range : Range = Range(4,6) + assert(range.withinRange(5)) + assert(range.withinRange(6)) + assert(range.withinRange(4)) + assert(!range.withinRange(7)) + } + + it should "accept same lower and higher range values" in { + val range : Range = Range(4,4) + assert(range.withinRange(4)) + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/SeasonTest.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/SeasonTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/SeasonTest.scala new file mode 100644 index 0000000..4d542e8 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/SeasonTest.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.common + +import java.util.Calendar + +import org.scalatest.FunSuite + +class SeasonTest extends FunSuite { + + test("testBelongsTo") { + + //Create Season for weekdays. Mon to Friday and 9AM - 5PM + var season : Season = Season(Range(Calendar.MONDAY,Calendar.FRIDAY), Range(9,17)) + + //Try with a timestamp on a Monday, @ 9AM. + val c = Calendar.getInstance + c.set(2017, Calendar.OCTOBER, 30, 9, 0, 0) + assert(season.belongsTo(c.getTimeInMillis)) + + c.set(2017, Calendar.OCTOBER, 30, 18, 0, 0) + assert(!season.belongsTo(c.getTimeInMillis)) + + //Try with a timestamp on a Sunday, @ 9AM. + c.set(2017, Calendar.OCTOBER, 29, 9, 0, 0) + assert(!season.belongsTo(c.getTimeInMillis)) + + //Create Season for Monday 11AM - 12Noon. + season = Season(Range(Calendar.MONDAY,Calendar.MONDAY), Range(11,12)) + c.set(2017, Calendar.OCTOBER, 30, 9, 0, 0) + assert(!season.belongsTo(c.getTimeInMillis)) + + c.set(2017, Calendar.OCTOBER, 30, 11, 30, 0) + assert(season.belongsTo(c.getTimeInMillis)) + + + //Create Season from Friday to Monday and 9AM - 5PM + season = Season(Range(Calendar.FRIDAY,Calendar.MONDAY), Range(9,17)) + + //Try with a timestamp on a Monday, @ 9AM. + c.set(2017, Calendar.OCTOBER, 30, 9, 0, 0) + assert(season.belongsTo(c.getTimeInMillis)) + + //Try with a timestamp on a Sunday, @ 3PM. + c.set(2017, Calendar.OCTOBER, 29, 15, 0, 0) + assert(season.belongsTo(c.getTimeInMillis)) + + //Try with a timestamp on a Wednesday, @ 9AM. + c.set(2017, Calendar.NOVEMBER, 1, 9, 0, 0) + assert(!season.belongsTo(c.getTimeInMillis)) + } + + test("testEquals") { + + var season1: Season = Season(Range(4,5), Range(2,3)) + var season2: Season = Season(Range(4,5), Range(2,3)) + assert(season1 == season2) + + var season3: Season = Season(Range(4,4), Range(2,3)) + assert(!(season1 == season3)) + } + + test("testSerialize") { + val season1 : Season = Season(Range(Calendar.MONDAY,Calendar.FRIDAY), Range(9,17)) + + val seasonString = Season.serialize(season1) + + val season2 : Season = Season.deserialize(seasonString) + assert(season1 == season2) + + val season3 : Season = Season(Range(Calendar.MONDAY,Calendar.THURSDAY), Range(9,17)) + assert(!(season2 == season3)) + + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala new file mode 100644 index 0000000..bd38e9a --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.metadata + +import org.apache.ambari.metrics.adservice.configuration.MetricCollectorConfiguration +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey +import org.scalatest.FunSuite + +class AMSMetadataProviderTest extends FunSuite { + + test("testFromTimelineMetricKey") { + val timelineMetricKeys: java.util.Set[TimelineMetricKey] = new java.util.HashSet[TimelineMetricKey]() + + val uuid: Array[Byte] = Array.empty[Byte] + + for (i <- 1 to 3) { + val key: TimelineMetricKey = new TimelineMetricKey("M" + i, "App", null, "H", uuid) + timelineMetricKeys.add(key) + } + + val aMSMetadataProvider : ADMetadataProvider = new ADMetadataProvider(new MetricCollectorConfiguration) + + val metricKeys : Set[MetricKey] = aMSMetadataProvider.fromTimelineMetricKey(timelineMetricKeys) + assert(metricKeys.size == 3) + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerServiceTest.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerServiceTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerServiceTest.scala new file mode 100644 index 0000000..8e19a0f --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricManagerServiceTest.scala @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.metadata + +import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig +import org.apache.ambari.metrics.adservice.db.AdMetadataStoreAccessor +import org.easymock.EasyMock.{anyObject, expect, expectLastCall, replay} +import org.scalatest.FunSuite +import org.scalatest.easymock.EasyMockSugar + +class MetricManagerServiceTest extends FunSuite { + + test("testAddDefinition") { + + val definitions : scala.collection.mutable.MutableList[MetricSourceDefinition] = scala.collection.mutable.MutableList.empty[MetricSourceDefinition] + + for (i <- 1 to 3) { + val msd1 : MetricSourceDefinition = new MetricSourceDefinition("TestDefinition" + i, "testAppId", MetricSourceDefinitionType.API) + definitions.+=(msd1) + } + + val newDef : MetricSourceDefinition = new MetricSourceDefinition("NewDefinition", "testAppId", MetricSourceDefinitionType.API) + + val adMetadataStoreAccessor: AdMetadataStoreAccessor = EasyMockSugar.niceMock[AdMetadataStoreAccessor] + expect(adMetadataStoreAccessor.getSavedInputDefinitions).andReturn(definitions.toList).once() + expect(adMetadataStoreAccessor.saveInputDefinition(newDef)).andReturn(true).once() + replay(adMetadataStoreAccessor) + + val metricManagerService: MetricManagerServiceImpl = new MetricManagerServiceImpl(new AnomalyDetectionAppConfig, adMetadataStoreAccessor) + + metricManagerService.setAdMetadataStoreAccessor(adMetadataStoreAccessor) + + metricManagerService.addDefinition(newDef) + + assert(metricManagerService.metricSourceDefinitionMap.size == 4) + assert(metricManagerService.metricSourceDefinitionMap.get("testDefinition") != null) + } + + test("testGetDefinitionByName") { + val definitions : scala.collection.mutable.MutableList[MetricSourceDefinition] = scala.collection.mutable.MutableList.empty[MetricSourceDefinition] + + for (i <- 1 to 3) { + val msd1 : MetricSourceDefinition = new MetricSourceDefinition("TestDefinition" + i, "testAppId", MetricSourceDefinitionType.API) + definitions.+=(msd1) + } + + val adMetadataStoreAccessor: AdMetadataStoreAccessor = EasyMockSugar.niceMock[AdMetadataStoreAccessor] + expect(adMetadataStoreAccessor.getSavedInputDefinitions).andReturn(definitions.toList).once() + replay(adMetadataStoreAccessor) + + val metricManagerService: MetricManagerServiceImpl = new MetricManagerServiceImpl(new AnomalyDetectionAppConfig, adMetadataStoreAccessor) + + metricManagerService.setAdMetadataStoreAccessor(adMetadataStoreAccessor) + for (i <- 1 to 3) { + val definition: MetricSourceDefinition = metricManagerService.getDefinitionByName("TestDefinition" + i) + assert(definition != null) + } + } + + test("testGetDefinitionByAppId") { + val definitions : scala.collection.mutable.MutableList[MetricSourceDefinition] = scala.collection.mutable.MutableList.empty[MetricSourceDefinition] + + for (i <- 1 to 3) { + var msd1 : MetricSourceDefinition = null + if (i == 2) { + msd1 = new MetricSourceDefinition("TestDefinition" + i, null, MetricSourceDefinitionType.API) + } else { + msd1 = new MetricSourceDefinition("TestDefinition" + i, "testAppId", MetricSourceDefinitionType.API) + } + definitions.+=(msd1) + } + + val adMetadataStoreAccessor: AdMetadataStoreAccessor = EasyMockSugar.niceMock[AdMetadataStoreAccessor] + expect(adMetadataStoreAccessor.getSavedInputDefinitions).andReturn(definitions.toList).once() + replay(adMetadataStoreAccessor) + + val metricManagerService: MetricManagerServiceImpl = new MetricManagerServiceImpl(new AnomalyDetectionAppConfig, adMetadataStoreAccessor) + + metricManagerService.setAdMetadataStoreAccessor(adMetadataStoreAccessor) + val definitionsByAppId: List[MetricSourceDefinition] = metricManagerService.getDefinitionByAppId("testAppId") + assert(definitionsByAppId.size == 2) + } + + test("testDeleteDefinitionByName") { + val definitions : scala.collection.mutable.MutableList[MetricSourceDefinition] = scala.collection.mutable.MutableList.empty[MetricSourceDefinition] + + for (i <- 1 to 3) { + var msd1 : MetricSourceDefinition = null + if (i == 2) { + msd1 = new MetricSourceDefinition("TestDefinition" + i, null, MetricSourceDefinitionType.CONFIG) + } else { + msd1 = new MetricSourceDefinition("TestDefinition" + i, "testAppId", MetricSourceDefinitionType.API) + } + definitions.+=(msd1) + } + + val adMetadataStoreAccessor: AdMetadataStoreAccessor = EasyMockSugar.niceMock[AdMetadataStoreAccessor] + expect(adMetadataStoreAccessor.getSavedInputDefinitions).andReturn(definitions.toList).once() + expect(adMetadataStoreAccessor.removeInputDefinition(anyObject[String])).andReturn(true).times(2) + replay(adMetadataStoreAccessor) + + val metricManagerService: MetricManagerServiceImpl = new MetricManagerServiceImpl(new AnomalyDetectionAppConfig, adMetadataStoreAccessor) + + metricManagerService.setAdMetadataStoreAccessor(adMetadataStoreAccessor) + + var success: Boolean = metricManagerService.deleteDefinitionByName("TestDefinition1") + assert(success) + success = metricManagerService.deleteDefinitionByName("TestDefinition2") + assert(!success) + success = metricManagerService.deleteDefinitionByName("TestDefinition3") + assert(success) + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala new file mode 100644 index 0000000..c4d639c --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.metadata + +import org.scalatest.FunSuite + +class MetricSourceDefinitionTest extends FunSuite { + + test("createNewMetricSourceDefinition") { + val msd : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "testAppId", MetricSourceDefinitionType.API) + + assert(msd.definitionName == "testDefinition") + assert(msd.appId == "testAppId") + assert(msd.definitionSource == MetricSourceDefinitionType.API) + + assert(msd.hosts.isEmpty) + assert(msd.metricDefinitions.isEmpty) + assert(msd.associatedAnomalySubsystems.isEmpty) + assert(msd.relatedDefinitions.isEmpty) + } + + test("testAddMetricDefinition") { + val msd : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "testAppId", MetricSourceDefinitionType.API) + assert(msd.metricDefinitions.isEmpty) + + msd.addMetricDefinition(MetricDefinition("TestMetric", "TestApp", List.empty[String])) + assert(msd.metricDefinitions.nonEmpty) + } + + test("testEquals") { + val msd1 : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "testAppId", MetricSourceDefinitionType.API) + val msd2 : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "testAppId2", MetricSourceDefinitionType.API) + assert(msd1 == msd2) + } + + test("testRemoveMetricDefinition") { + val msd : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "testAppId", MetricSourceDefinitionType.API) + assert(msd.metricDefinitions.isEmpty) + + msd.addMetricDefinition(MetricDefinition("TestMetric", "TestApp", List.empty[String])) + assert(msd.metricDefinitions.nonEmpty) + + msd.removeMetricDefinition(MetricDefinition("TestMetric", "TestApp", List.empty[String])) + assert(msd.metricDefinitions.isEmpty) + } + + test("serializeDeserialize") { + val msd : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "testAppId", MetricSourceDefinitionType.API) + val msdString: String = MetricSourceDefinition.serialize(msd) + assert(msdString.nonEmpty) + + val msd2: MetricSourceDefinition = MetricSourceDefinition.deserialize(msdString) + assert(msd2 != null) + assert(msd == msd2) + + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java new file mode 100644 index 0000000..7619811 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.timeline; + +import org.apache.commons.lang.StringUtils; + +public class TimelineMetricKey { + public String metricName; + public String appId; + public String instanceId = null; + public String hostName; + public byte[] uuid; + + public TimelineMetricKey(String metricName, String appId, String instanceId, String hostName, byte[] uuid) { + this.metricName = metricName; + this.appId = appId; + this.instanceId = instanceId; + this.hostName = hostName; + this.uuid = uuid; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimelineMetricKey that = (TimelineMetricKey) o; + + if (!metricName.equals(that.metricName)) return false; + if (!appId.equals(that.appId)) return false; + if (!hostName.equals(that.hostName)) return false; + return (StringUtils.isNotEmpty(instanceId) ? instanceId.equals(that.instanceId) : StringUtils.isEmpty(that.instanceId)); + } + + @Override + public int hashCode() { + int result = metricName.hashCode(); + result = 31 * result + (appId != null ? appId.hashCode() : 0); + result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); + result = 31 * result + (hostName != null ? hostName.hashCode() : 0); + return result; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java index bb26439..e90a97f 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java @@ -39,6 +39,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; @@ -50,6 +52,7 @@ import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; @@ -438,11 +441,17 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time } @Override - public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException { + public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String appId, String metricPattern, + boolean includeBlacklistedMetrics) throws SQLException, IOException { Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = metricMetadataManager.getMetadataCache(); - boolean includeBlacklistedMetrics = StringUtils.isNotEmpty(query) && "all".equalsIgnoreCase(query); + boolean filterByAppId = StringUtils.isNotEmpty(appId); + boolean filterByMetricName = StringUtils.isNotEmpty(metricPattern); + Pattern metricFilterPattern = null; + if (filterByMetricName) { + metricFilterPattern = Pattern.compile(metricPattern); + } // Group Metadata by AppId Map<String, List<TimelineMetricMetadata>> metadataByAppId = new HashMap<>(); @@ -451,10 +460,23 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time if (!includeBlacklistedMetrics && !metricMetadata.isWhitelisted()) { continue; } - List<TimelineMetricMetadata> metadataList = metadataByAppId.get(metricMetadata.getAppId()); + + String currentAppId = metricMetadata.getAppId(); + if (filterByAppId && !currentAppId.equals(appId)) { + continue; + } + + if (filterByMetricName) { + Matcher m = metricFilterPattern.matcher(metricMetadata.getMetricName()); + if (!m.find()) { + continue; + } + } + + List<TimelineMetricMetadata> metadataList = metadataByAppId.get(currentAppId); if (metadataList == null) { metadataList = new ArrayList<>(); - metadataByAppId.put(metricMetadata.getAppId(), metadataList); + metadataByAppId.put(currentAppId, metadataList); } metadataList.add(metricMetadata); @@ -464,8 +486,42 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time } @Override - public Map<String, TimelineMetricMetadataKey> getUuids() throws SQLException, IOException { - return metricMetadataManager.getUuidKeyMap(); + public byte[] getUuid(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException { + return metricMetadataManager.getUuid(metricName, appId, instanceId, hostname); + } + + /** + * Given a metricName, appId, instanceId and optional hostname parameter, return a set of TimelineMetricKey objects + * that will have all the unique metric instances for the above parameter filter. + * + * @param metricName + * @param appId + * @param instanceId + * @param hostname + * @return + * @throws SQLException + * @throws IOException + */ + @Override + public Set<TimelineMetricKey> getTimelineMetricKey(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException { + + if (StringUtils.isEmpty(hostname)) { + Set<String> hosts = new HashSet<>(); + for (String host : metricMetadataManager.getHostedAppsCache().keySet()) { + if (metricMetadataManager.getHostedAppsCache().get(host).getHostedApps().contains(appId)) { + hosts.add(host); + } + } + Set<TimelineMetricKey> timelineMetricKeys = new HashSet<>(); + for (String host : hosts) { + byte[] uuid = metricMetadataManager.getUuid(metricName, appId, instanceId, host); + timelineMetricKeys.add(new TimelineMetricKey(metricName, appId, instanceId, host, uuid)); + } + return timelineMetricKeys; + } else { + byte[] uuid = metricMetadataManager.getUuid(metricName, appId, instanceId, hostname); + return Collections.singleton(new TimelineMetricKey(metricName, appId, instanceId, hostname, uuid)); + } } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java index cdeefdc..f00bd91 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java @@ -21,11 +21,11 @@ import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import java.io.IOException; import java.sql.SQLException; @@ -81,7 +81,8 @@ public interface TimelineMetricStore { * @throws SQLException * @throws IOException */ - Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException; + Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String appId, String metricPattern, + boolean includeBlacklistedMetrics) throws SQLException, IOException; TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException; /** @@ -100,7 +101,7 @@ public interface TimelineMetricStore { */ Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) throws SQLException, IOException; - Map<String, TimelineMetricMetadataKey> getUuids() throws SQLException, IOException; + byte[] getUuid(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException; /** * Return a list of known live collector nodes @@ -109,4 +110,7 @@ public interface TimelineMetricStore { List<String> getLiveInstances(); TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) throws SQLException; + + Set<TimelineMetricKey> getTimelineMetricKey(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException; + } http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java index f9ad773..6b926ac 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java @@ -438,6 +438,16 @@ public class TimelineMetricMetadataManager { return ArrayUtils.addAll(metricUuid, hostUuid); } + public byte[] getUuid(String metricName, String appId, String instanceId, String hostname) { + + byte[] metricUuid = getUuid(new TimelineClusterMetric(metricName, appId, instanceId, -1l)); + if (StringUtils.isNotEmpty(hostname)) { + byte[] hostUuid = getUuidForHostname(hostname); + return ArrayUtils.addAll(metricUuid, hostUuid); + } + return metricUuid; + } + public String getMetricNameFromUuid(byte[] uuid) { byte[] metricUuid = uuid; http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java index 5d9bb35..db35686 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -50,6 +51,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -434,18 +436,24 @@ public class TimelineWebServices { throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } } + @GET @Path("/metrics/metadata") @Produces({ MediaType.APPLICATION_JSON }) public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata( @Context HttpServletRequest req, @Context HttpServletResponse res, - @QueryParam("query") String query + @QueryParam("appId") String appId, + @QueryParam("metricName") String metricPattern, + @QueryParam("includeAll") String includeBlacklistedMetrics ) { init(res); try { - return timelineMetricStore.getTimelineMetricMetadata(query); + return timelineMetricStore.getTimelineMetricMetadata( + parseStr(appId), + parseStr(metricPattern), + parseBoolean(includeBlacklistedMetrics)); } catch (Exception e) { throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } @@ -486,16 +494,40 @@ public class TimelineWebServices { } @GET - @Path("/metrics/uuids") + @Path("/metrics/uuid") @Produces({ MediaType.APPLICATION_JSON }) - public Map<String, TimelineMetricMetadataKey> getUuids( + public byte[] getUuid( @Context HttpServletRequest req, - @Context HttpServletResponse res + @Context HttpServletResponse res, + @QueryParam("metricName") String metricName, + @QueryParam("appId") String appId, + @QueryParam("instanceId") String instanceId, + @QueryParam("hostname") String hostname + ) { + init(res); + + try { + return timelineMetricStore.getUuid(metricName, appId, instanceId, hostname); + } catch (Exception e) { + throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + + @GET + @Path("/metrics/metadata/key") + @Produces({ MediaType.APPLICATION_JSON }) + public Set<TimelineMetricKey> getTimelineMetricKey( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @QueryParam("metricName") String metricName, + @QueryParam("appId") String appId, + @QueryParam("instanceId") String instanceId, + @QueryParam("hostname") String hostname ) { init(res); try { - return timelineMetricStore.getUuids(); + return timelineMetricStore.getTimelineMetricKey(metricName, appId, instanceId, hostname); } catch (Exception e) { throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); }
