Repository: ambari Updated Branches: refs/heads/branch-3.0-ams 0fcca47f6 -> ba9be8028
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala index 98ce0c4..db12307 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala @@ -19,17 +19,62 @@ package org.apache.ambari.metrics.adservice.resource import javax.ws.rs.core.MediaType.APPLICATION_JSON import javax.ws.rs.core.Response -import javax.ws.rs.{GET, Path, Produces} +import javax.ws.rs.{GET, Path, Produces, QueryParam} -import org.joda.time.DateTime +import org.apache.ambari.metrics.adservice.model.{AnomalyType, MetricAnomalyInstance} +import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType +import org.apache.ambari.metrics.adservice.service.ADQueryService +import org.apache.commons.lang.StringUtils + +import com.google.inject.Inject @Path("/anomaly") class AnomalyResource { + @Inject + var aDQueryService: ADQueryService = _ + @GET @Produces(Array(APPLICATION_JSON)) - def default: Response = { - Response.ok.entity(Map("message" -> "Anomaly Detection Service!", - "today" -> DateTime.now.toString("MM-dd-yyyy hh:mm"))).build() + def getTopNAnomalies(@QueryParam("type") anType: String, + @QueryParam("startTime") startTime: Long, + @QueryParam("endTime") endTime: Long, + @QueryParam("top") limit: Int): Response = { + + val anomalies: List[MetricAnomalyInstance] = aDQueryService.getTopNAnomaliesByType( + parseAnomalyType(anType), + parseStartTime(startTime), + parseEndTime(endTime), + parseTop(limit)) + + Response.ok.entity(anomalies).build() + } + + private def parseAnomalyType(anomalyType: String) : AnomalyType = { + if (StringUtils.isEmpty(anomalyType)) { + return AnomalyType.POINT_IN_TIME + } + AnomalyType.withName(anomalyType.toUpperCase) + } + + private def parseStartTime(startTime: Long) : Long = { + if (startTime > 0l) { + return startTime + } + System.currentTimeMillis() - 60*60*1000 + } + + private def parseEndTime(endTime: Long) : Long = { + if (endTime > 0l) { + return endTime + } + System.currentTimeMillis() + } + + private def parseTop(limit: Int) : Int = { + if (limit > 0) { + return limit + } + 5 } } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala index 16125fa..442bf46 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala @@ -17,10 +17,11 @@ package org.apache.ambari.metrics.adservice.resource -import javax.ws.rs.{GET, Path, Produces} +import javax.ws.rs._ import javax.ws.rs.core.MediaType.APPLICATION_JSON +import javax.ws.rs.core.Response -import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricSourceDefinition} +import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricKey, MetricSourceDefinition} import org.apache.commons.lang.StringUtils import com.google.inject.Inject @@ -33,8 +34,76 @@ class MetricDefinitionResource { @GET @Produces(Array(APPLICATION_JSON)) - def getMetricDefinition (definitionName: String) : MetricSourceDefinition = { - null + @Path("/{name}") + def defaultGet(@PathParam("name") definitionName: String): Response = { + + if (StringUtils.isEmpty(definitionName)) { + Response.ok.entity(Map("message" -> "Definition name cannot be empty. Use query parameter 'name'")).build() + } + val metricSourceDefinition = metricDefinitionService.getDefinitionByName(definitionName) + if (metricSourceDefinition != null) { + Response.ok.entity(metricSourceDefinition).build() + } else { + Response.ok.entity(Map("message" -> "Definition not found")).build() + } + } + + @GET + @Produces(Array(APPLICATION_JSON)) + def getAllMetricDefinitions: Response = { + val metricSourceDefinitionMap: List[MetricSourceDefinition] = metricDefinitionService.getDefinitions + Response.ok.entity(metricSourceDefinitionMap).build() + } + + @GET + @Path("/keys") + @Produces(Array(APPLICATION_JSON)) + def getMetricKeys: Response = { + val metricKeyMap: Map[String, Set[MetricKey]] = metricDefinitionService.getMetricKeys + Response.ok.entity(metricKeyMap).build() } + @POST + @Produces(Array(APPLICATION_JSON)) + def defaultPost(definition: MetricSourceDefinition) : Response = { + if (definition == null) { + Response.ok.entity(Map("message" -> "Definition content cannot be empty.")).build() + } + val success : Boolean = metricDefinitionService.addDefinition(definition) + if (success) { + Response.ok.entity(Map("message" -> "Definition saved")).build() + } else { + Response.ok.entity(Map("message" -> "Definition could not be saved")).build() + } + } + + @PUT + @Produces(Array(APPLICATION_JSON)) + def defaultPut(definition: MetricSourceDefinition) : Response = { + if (definition == null) { + Response.ok.entity(Map("message" -> "Definition content cannot be empty.")).build() + } + val success : Boolean = metricDefinitionService.updateDefinition(definition) + if (success) { + Response.ok.entity(Map("message" -> "Definition updated")).build() + } else { + Response.ok.entity(Map("message" -> "Definition could not be updated")).build() + } + } + + @DELETE + @Produces(Array(APPLICATION_JSON)) + @Path("/{name}") + def defaultDelete(@PathParam("name") definitionName: String): Response = { + + if (StringUtils.isEmpty(definitionName)) { + Response.ok.entity(Map("message" -> "Definition name cannot be empty. Use query parameter 'name'")).build() + } + val success: Boolean = metricDefinitionService.deleteDefinitionByName(definitionName) + if (success) { + Response.ok.entity(Map("message" -> "Definition deleted")).build() + } else { + Response.ok.entity(Map("message" -> "Definition could not be deleted")).build() + } + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala index 22fe0ac..fd55b64 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala @@ -17,6 +17,8 @@ */ package org.apache.ambari.metrics.adservice.resource +import java.time.LocalDateTime + import javax.ws.rs.core.MediaType.APPLICATION_JSON import javax.ws.rs.core.Response import javax.ws.rs.{GET, Path, Produces} @@ -29,7 +31,8 @@ class RootResource { @Produces(Array(APPLICATION_JSON)) @GET def default: Response = { - Response.ok.entity(Map("name" -> "anomaly-detection-service", "today" -> DateTime.now.toString("MM-dd-yyyy hh:mm"))).build() + val dtf = java.time.format.DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm") + Response.ok.entity(Map("name" -> "anomaly-detection-service", "today" -> LocalDateTime.now)).build() } } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala index 8e6f511..2cfa30f 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala @@ -18,9 +18,9 @@ package org.apache.ambari.metrics.adservice.service import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType -import org.apache.ambari.metrics.adservice.model.SingleMetricAnomalyInstance +import org.apache.ambari.metrics.adservice.model.MetricAnomalyInstance -trait ADQueryService { +trait ADQueryService extends AbstractADService{ /** * API to return list of single metric anomalies satisfying a set of conditions from the anomaly store. @@ -30,5 +30,5 @@ trait ADQueryService { * @param limit Maximim number of anomaly metrics that need to be returned based on anomaly score. * @return */ - def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[SingleMetricAnomalyInstance] + def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[MetricAnomalyInstance] } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala index e5efa44..3b49208 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala @@ -16,11 +16,30 @@ * limitations under the License. */ package org.apache.ambari.metrics.adservice.service +import org.apache.ambari.metrics.adservice.db.AdAnomalyStoreAccessor import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType -import org.apache.ambari.metrics.adservice.model.SingleMetricAnomalyInstance +import org.apache.ambari.metrics.adservice.model.MetricAnomalyInstance +import org.slf4j.{Logger, LoggerFactory} +import com.google.inject.{Inject, Singleton} + +@Singleton class ADQueryServiceImpl extends ADQueryService { + val LOG : Logger = LoggerFactory.getLogger(classOf[ADQueryServiceImpl]) + + @Inject + var adAnomalyStoreAccessor: AdAnomalyStoreAccessor = _ + + /** + * Initialize Service + */ + override def initialize(): Unit = { + LOG.info("Initializing AD Query Service...") + adAnomalyStoreAccessor.initialize() + LOG.info("Successfully initialized AD Query Service.") + } + /** * Implementation to return list of anomalies satisfying a set of conditions from the anomaly store. * @@ -30,8 +49,8 @@ class ADQueryServiceImpl extends ADQueryService { * @param limit Maximim number of anomaly metrics that need to be returned based on anomaly score. * @return */ - override def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[SingleMetricAnomalyInstance] = { - val anomalies = List.empty[SingleMetricAnomalyInstance] + override def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[MetricAnomalyInstance] = { + val anomalies = adAnomalyStoreAccessor.getMetricAnomalies(anomalyType, startTime, endTime, limit) anomalies } } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/AbstractADService.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/AbstractADService.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/AbstractADService.scala new file mode 100644 index 0000000..56bb999 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/AbstractADService.scala @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *//** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.service + +trait AbstractADService { + + /** + * Initialize Service + */ + def initialize(): Unit + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala index 63cf8c7..56ca2c1 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala @@ -23,7 +23,7 @@ import org.apache.ambari.metrics.adservice.common.Season import org.apache.ambari.metrics.adservice.metadata.MetricKey import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType -import org.apache.ambari.metrics.adservice.model.{AnomalyType, SingleMetricAnomalyInstance} +import org.apache.ambari.metrics.adservice.model.{AnomalyType, MetricAnomalyInstance} class PointInTimeAnomalyInstance(val metricKey: MetricKey, val timestamp: Long, @@ -31,7 +31,7 @@ class PointInTimeAnomalyInstance(val metricKey: MetricKey, val methodType: AnomalyDetectionMethod, val anomalyScore: Double, val anomalousSeason: Season, - val modelParameters: String) extends SingleMetricAnomalyInstance { + val modelParameters: String) extends MetricAnomalyInstance { override val anomalyType: AnomalyType = AnomalyType.POINT_IN_TIME http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala index 3fc0d6f..7392d59 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala @@ -21,7 +21,7 @@ import org.apache.ambari.metrics.adservice.common.{Season, TimeRange} import org.apache.ambari.metrics.adservice.metadata.MetricKey import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType -import org.apache.ambari.metrics.adservice.model.{AnomalyType, SingleMetricAnomalyInstance} +import org.apache.ambari.metrics.adservice.model.{AnomalyType, MetricAnomalyInstance} case class TrendAnomalyInstance (metricKey: MetricKey, anomalousPeriod: TimeRange, @@ -29,7 +29,7 @@ case class TrendAnomalyInstance (metricKey: MetricKey, methodType: AnomalyDetectionMethod, anomalyScore: Double, seasonInfo: Season, - modelParameters: String) extends SingleMetricAnomalyInstance { + modelParameters: String) extends MetricAnomalyInstance { override val anomalyType: AnomalyType = AnomalyType.POINT_IN_TIME http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala index 2a4999c..e38ea40 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala @@ -17,6 +17,8 @@ */ package org.apache.ambari.metrics.adservice.app +import java.time.LocalDateTime + import javax.ws.rs.client.Client import javax.ws.rs.core.MediaType.APPLICATION_JSON @@ -37,7 +39,8 @@ class DefaultADResourceSpecTest extends FunSpec with Matchers { withAppRunning(classOf[AnomalyDetectionApp], Resources.getResource("config.yml").getPath) { rule => val json = client.target(s"http://localhost:${rule.getLocalPort}/anomaly") .request().accept(APPLICATION_JSON).buildGet().invoke(classOf[String]) - val now = DateTime.now.toString("MM-dd-yyyy hh:mm") + val dtf = java.time.format.DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm") + val now = LocalDateTime.now assert(json == "{\"message\":\"Anomaly Detection Service!\"," + "\"today\":\"" + now + "\"}") } } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala index bd38e9a..79366b1 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala @@ -18,26 +18,32 @@ package org.apache.ambari.metrics.adservice.metadata +import java.util + import org.apache.ambari.metrics.adservice.configuration.MetricCollectorConfiguration -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey import org.scalatest.FunSuite class AMSMetadataProviderTest extends FunSuite { test("testFromTimelineMetricKey") { - val timelineMetricKeys: java.util.Set[TimelineMetricKey] = new java.util.HashSet[TimelineMetricKey]() + val timelineMetricKeys: java.util.Set[java.util.Map[String, String]] = new java.util.HashSet[java.util.Map[String, String]]() val uuid: Array[Byte] = Array.empty[Byte] for (i <- 1 to 3) { - val key: TimelineMetricKey = new TimelineMetricKey("M" + i, "App", null, "H", uuid) - timelineMetricKeys.add(key) + val keyMap: java.util.Map[String, String] = new util.HashMap[String, String]() + keyMap.put("metricName", "M" + i) + keyMap.put("appId", "App") + keyMap.put("hostname", "H") + keyMap.put("uuid", new String(uuid)) + timelineMetricKeys.add(keyMap) } val aMSMetadataProvider : ADMetadataProvider = new ADMetadataProvider(new MetricCollectorConfiguration) - val metricKeys : Set[MetricKey] = aMSMetadataProvider.fromTimelineMetricKey(timelineMetricKeys) + val metricKeys : Set[MetricKey] = aMSMetadataProvider.getMetricKeys(timelineMetricKeys) assert(metricKeys.size == 3) } + } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala index 0149673..c4d4dbc 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala @@ -20,6 +20,10 @@ package org.apache.ambari.metrics.adservice.metadata import org.apache.commons.lang.SerializationUtils import org.scalatest.FunSuite +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.ambari.metrics.adservice.app.ADServiceScalaModule + class MetricSourceDefinitionTest extends FunSuite { test("createNewMetricSourceDefinition") { @@ -65,7 +69,12 @@ class MetricSourceDefinitionTest extends FunSuite { } test("serializeDeserialize") { - val msd : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "testAppId", MetricSourceDefinitionType.API) + + val msd : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "A1", MetricSourceDefinitionType.API) + msd.hosts = List("h1") + msd.addMetricDefinition(MetricDefinition("M1", null, List("h2"))) + msd.addMetricDefinition(MetricDefinition("M1", "A2", null)) + val msdByteArray: Array[Byte] = SerializationUtils.serialize(msd) assert(msdByteArray.nonEmpty) @@ -73,5 +82,10 @@ class MetricSourceDefinitionTest extends FunSuite { assert(msd2 != null) assert(msd == msd2) + val mapper : ObjectMapper = new ObjectMapper() + mapper.registerModule(new ADServiceScalaModule) + + System.out.print(mapper.writeValueAsString(msd)) + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-common/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml index de49235..3c32b73 100644 --- a/ambari-metrics/ambari-metrics-common/pom.xml +++ b/ambari-metrics/ambari-metrics-common/pom.xml @@ -27,12 +27,6 @@ <artifactId>ambari-metrics-common</artifactId> <name>Ambari Metrics Common</name> - <properties> - <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version> - <hbase.version>1.1.2.2.6.0.3-8</hbase.version> - <phoenix.version>4.7.0.2.6.0.3-8</phoenix.version> - </properties> - <build> <plugins> <plugin> @@ -143,45 +137,6 @@ <dependencies> <dependency> - <groupId>org.apache.phoenix</groupId> - <artifactId>phoenix-core</artifactId> - <version>${phoenix.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-annotations</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>commons-el</groupId> - <artifactId>commons-el</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1-jetty</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>net.sf.ehcache</groupId> <artifactId>ehcache</artifactId> <version>2.10.0</version> http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java deleted file mode 100644 index 7619811..0000000 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metrics2.sink.timeline; - -import org.apache.commons.lang.StringUtils; - -public class TimelineMetricKey { - public String metricName; - public String appId; - public String instanceId = null; - public String hostName; - public byte[] uuid; - - public TimelineMetricKey(String metricName, String appId, String instanceId, String hostName, byte[] uuid) { - this.metricName = metricName; - this.appId = appId; - this.instanceId = instanceId; - this.hostName = hostName; - this.uuid = uuid; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TimelineMetricKey that = (TimelineMetricKey) o; - - if (!metricName.equals(that.metricName)) return false; - if (!appId.equals(that.appId)) return false; - if (!hostName.equals(that.hostName)) return false; - return (StringUtils.isNotEmpty(instanceId) ? instanceId.equals(that.instanceId) : StringUtils.isEmpty(that.instanceId)); - } - - @Override - public int hashCode() { - int result = metricName.hashCode(); - result = 31 * result + (appId != null ? appId.hashCode() : 0); - result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); - result = 31 * result + (hostName != null ? hostName.hashCode() : 0); - return result; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java deleted file mode 100644 index 72e5fb5..0000000 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.metrics2.sink.timeline.query; - - -import org.apache.hadoop.hbase.util.RetryCounterFactory; - -import java.sql.Connection; -import java.sql.SQLException; - -/** - * - */ -public interface ConnectionProvider { - public Connection getConnection() throws SQLException; - public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) throws SQLException, InterruptedException; -} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java deleted file mode 100644 index a28a433..0000000 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.metrics2.sink.timeline.query; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.util.RetryCounter; -import org.apache.hadoop.hbase.util.RetryCounterFactory; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -public class DefaultPhoenixDataSource implements PhoenixConnectionProvider { - - static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class); - private static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort"; - private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; - private static final String ZNODE_PARENT = "zookeeper.znode.parent"; - - private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s"; - private final String url; - - private Configuration hbaseConf; - - public DefaultPhoenixDataSource(Configuration hbaseConf) { - this.hbaseConf = hbaseConf; - String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181"); - String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM); - String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/ams-hbase-unsecure"); - if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) { - throw new IllegalStateException("Unable to find Zookeeper quorum to " + - "access HBase store using Phoenix."); - } - - url = String.format(connectionUrl, - zookeeperQuorum, - zookeeperClientPort, - znodeParent); - } - - /** - * Get HBaseAdmin for table ops. - * @return @HBaseAdmin - * @throws IOException - */ - public HBaseAdmin getHBaseAdmin() throws IOException { - return (HBaseAdmin) ConnectionFactory.createConnection(hbaseConf).getAdmin(); - } - - /** - * Get JDBC connection to HBase store. Assumption is that the hbase - * configuration is present on the classpath and loaded by the caller into - * the Configuration object. - * Phoenix already caches the HConnection between the client and HBase - * cluster. - * - * @return @java.sql.Connection - */ - public Connection getConnection() throws SQLException { - - LOG.debug("Metric store connection url: " + url); - try { - return DriverManager.getConnection(url); - } catch (SQLException e) { - LOG.warn("Unable to connect to HBase store using Phoenix.", e); - - throw e; - } - } - - public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) - throws SQLException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try{ - return getConnection(); - } catch (SQLException e) { - if(!retryCounter.shouldRetry()){ - LOG.error("HBaseAccessor getConnection failed after " - + retryCounter.getMaxAttempts() + " attempts"); - throw e; - } - } - retryCounter.sleepUntilNextRetry(); - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java deleted file mode 100644 index 194c769..0000000 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.apache.hadoop.metrics2.sink.timeline.query; - -import org.apache.hadoop.hbase.client.HBaseAdmin; - -import java.io.IOException; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public interface PhoenixConnectionProvider extends ConnectionProvider { - /** - * Get HBaseAdmin for the Phoenix connection - * @return - * @throws IOException - */ - HBaseAdmin getHBaseAdmin() throws IOException; -} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java index e90a97f..20b344f 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java @@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -52,7 +53,6 @@ import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; @@ -497,30 +497,44 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time * @param metricName * @param appId * @param instanceId - * @param hostname + * @param hosts * @return * @throws SQLException * @throws IOException */ @Override - public Set<TimelineMetricKey> getTimelineMetricKey(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException { + public Set<Map<String, String>> getTimelineMetricKeys(String metricName, String appId, String instanceId, List<String> hosts) + throws SQLException, IOException { + Set<Map<String, String>> timelineMetricKeys = new HashSet<>(); - if (StringUtils.isEmpty(hostname)) { - Set<String> hosts = new HashSet<>(); + if (CollectionUtils.isEmpty(hosts)) { + Set<String> hostsFromMetadata = new HashSet<>(); for (String host : metricMetadataManager.getHostedAppsCache().keySet()) { if (metricMetadataManager.getHostedAppsCache().get(host).getHostedApps().contains(appId)) { - hosts.add(host); + hostsFromMetadata.add(host); } } - Set<TimelineMetricKey> timelineMetricKeys = new HashSet<>(); - for (String host : hosts) { + for (String host : hostsFromMetadata) { byte[] uuid = metricMetadataManager.getUuid(metricName, appId, instanceId, host); - timelineMetricKeys.add(new TimelineMetricKey(metricName, appId, instanceId, host, uuid)); + Map<String, String> keyMap = new HashMap<>(); + keyMap.put("metricName", metricName); + keyMap.put("appId", appId); + keyMap.put("hostname", host); + keyMap.put("uuid", new String(uuid)); + timelineMetricKeys.add(keyMap); } return timelineMetricKeys; } else { - byte[] uuid = metricMetadataManager.getUuid(metricName, appId, instanceId, hostname); - return Collections.singleton(new TimelineMetricKey(metricName, appId, instanceId, hostname, uuid)); + for (String host : hosts) { + byte[] uuid = metricMetadataManager.getUuid(metricName, appId, instanceId, host); + Map<String, String> keyMap = new HashMap<>(); + keyMap.put("metricName", metricName); + keyMap.put("appId", appId); + keyMap.put("hostname", host); + keyMap.put("uuid", new String(uuid)); + timelineMetricKeys.add(keyMap); + } + return timelineMetricKeys; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 9d595a4..cf382f1 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -120,6 +120,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; @@ -139,8 +140,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.metrics2.sink.timeline.query.DefaultPhoenixDataSource; -import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink; @@ -210,7 +211,7 @@ public class PhoenixHBaseAccessor { private HashMap<String, String> tableTTL = new HashMap<>(); private final TimelineMetricConfiguration configuration; - private List<InternalMetricsSource> rawMetricsSources; + private List<InternalMetricsSource> rawMetricsSources = new ArrayList<>(); public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) { this(TimelineMetricConfiguration.getInstance(), dataSource); @@ -459,6 +460,23 @@ public class PhoenixHBaseAccessor { return mapper.readValue(json, metricValuesTypeRef); } + private Connection getConnectionRetryingOnException() + throws SQLException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try{ + return getConnection(); + } catch (SQLException e) { + if(!retryCounter.shouldRetry()){ + LOG.error("HBaseAccessor getConnection failed after " + + retryCounter.getMaxAttempts() + " attempts"); + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } + /** * Get JDBC connection to HBase store. Assumption is that the hbase * configuration is present on the classpath and loaded by the caller into @@ -491,7 +509,7 @@ public class PhoenixHBaseAccessor { try { LOG.info("Initializing metrics schema..."); - conn = dataSource.getConnectionRetryingOnException(retryCounterFactory); + conn = getConnectionRetryingOnException(); stmt = conn.createStatement(); // Metadata http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java index f00bd91..349ef83 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java @@ -21,7 +21,6 @@ import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; @@ -111,6 +110,6 @@ public interface TimelineMetricStore { TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) throws SQLException; - Set<TimelineMetricKey> getTimelineMetricKey(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException; + Set<Map<String, String>> getTimelineMetricKeys(String metricName, String appId, String instanceId, List<String> hosts) throws SQLException, IOException; } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java new file mode 100644 index 0000000..391af27 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; + + +import org.apache.hadoop.hbase.util.RetryCounterFactory; + +import java.sql.Connection; +import java.sql.SQLException; + +/** + * + */ +public interface ConnectionProvider { + public Connection getConnection() throws SQLException; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java new file mode 100644 index 0000000..67afe6b --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +public class DefaultPhoenixDataSource implements PhoenixConnectionProvider { + + static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class); + private static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort"; + private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + private static final String ZNODE_PARENT = "zookeeper.znode.parent"; + + private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s"; + private final String url; + + private Configuration hbaseConf; + + public DefaultPhoenixDataSource(Configuration hbaseConf) { + this.hbaseConf = hbaseConf; + String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181"); + String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM); + String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/ams-hbase-unsecure"); + if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) { + throw new IllegalStateException("Unable to find Zookeeper quorum to " + + "access HBase store using Phoenix."); + } + + url = String.format(connectionUrl, + zookeeperQuorum, + zookeeperClientPort, + znodeParent); + } + + /** + * Get HBaseAdmin for table ops. + * @return @HBaseAdmin + * @throws IOException + */ + public HBaseAdmin getHBaseAdmin() throws IOException { + return (HBaseAdmin) ConnectionFactory.createConnection(hbaseConf).getAdmin(); + } + + /** + * Get JDBC connection to HBase store. Assumption is that the hbase + * configuration is present on the classpath and loaded by the caller into + * the Configuration object. + * Phoenix already caches the HConnection between the client and HBase + * cluster. + * + * @return @java.sql.Connection + */ + public Connection getConnection() throws SQLException { + + LOG.debug("Metric store connection url: " + url); + try { + return DriverManager.getConnection(url); + } catch (SQLException e) { + LOG.warn("Unable to connect to HBase store using Phoenix.", e); + + throw e; + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java new file mode 100644 index 0000000..cacbcfb --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java @@ -0,0 +1,31 @@ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; + +import org.apache.hadoop.hbase.client.HBaseAdmin; + +import java.io.IOException; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface PhoenixConnectionProvider extends ConnectionProvider { + /** + * Get HBaseAdmin for the Phoenix connection + * @return + * @throws IOException + */ + HBaseAdmin getHBaseAdmin() throws IOException; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java index db35686..dc401e6 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; import com.google.inject.Inject; import com.google.inject.Singleton; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -27,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -69,7 +69,9 @@ import javax.xml.bind.annotation.XmlRootElement; import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; import java.util.List; @@ -516,7 +518,7 @@ public class TimelineWebServices { @GET @Path("/metrics/metadata/key") @Produces({ MediaType.APPLICATION_JSON }) - public Set<TimelineMetricKey> getTimelineMetricKey( + public Set<Map<String, String>> getTimelineMetricKey( @Context HttpServletRequest req, @Context HttpServletResponse res, @QueryParam("metricName") String metricName, @@ -527,7 +529,11 @@ public class TimelineWebServices { init(res); try { - return timelineMetricStore.getTimelineMetricKey(metricName, appId, instanceId, hostname); + if (StringUtils.isEmpty(hostname)) { + return timelineMetricStore.getTimelineMetricKeys(metricName, appId, instanceId, Collections.EMPTY_LIST); + } else { + return timelineMetricStore.getTimelineMetricKeys(metricName, appId, instanceId, Arrays.asList(StringUtils.split(hostname, ","))); + } } catch (Exception e) { throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java index 7b70a80..03205e7 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; -import org.apache.hadoop.metrics2.sink.timeline.query.DefaultPhoenixDataSource; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource; import org.apache.zookeeper.ClientCnxn; import org.easymock.EasyMock; import org.junit.After; http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java index 9c55305..741bb3c 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java @@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils; -import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -224,17 +224,6 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest { return connection; } - @Override - public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) throws SQLException, InterruptedException { - Connection connection = null; - try { - connection = DriverManager.getConnection(getUrl()); - } catch (SQLException e) { - LOG.warn("Unable to connect to HBase store using Phoenix.", e); - } - return connection; - } - }); } http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java index 5d81faa..50ff656 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java @@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; -import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import org.apache.phoenix.exception.PhoenixIOException; import org.easymock.EasyMock; @@ -96,10 +96,6 @@ public class PhoenixHBaseAccessorTest { return null; } - @Override - public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) throws SQLException, InterruptedException { - return null; - } }; accessor = new PhoenixHBaseAccessor(connectionProvider); @@ -256,11 +252,6 @@ public class PhoenixHBaseAccessorTest { public Connection getConnection() throws SQLException { return connection; } - - @Override - public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) throws SQLException, InterruptedException { - return connection; - } }; accessor = new PhoenixHBaseAccessor(connectionProvider); http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java index 32cc813..9b79fa9 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java @@ -30,7 +30,6 @@ import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; @@ -126,7 +125,7 @@ public class TestTimelineMetricStore implements TimelineMetricStore { } @Override - public Set<TimelineMetricKey> getTimelineMetricKey(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException { + public Set<Map<String, String>> getTimelineMetricKeys(String metricName, String appId, String instanceId, List<String> hosts) throws SQLException, IOException { return Collections.emptySet(); } }
