AMBARI-22348 : Metric Definition Service V1 Implementation. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a42cbc5f Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a42cbc5f Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a42cbc5f Branch: refs/heads/branch-3.0-ams Commit: a42cbc5f3f6fbdf20845e037b111a18c0b7ad55d Parents: 1ec0477 Author: Aravindan Vijayan <[email protected]> Authored: Wed Nov 1 15:38:20 2017 -0700 Committer: Aravindan Vijayan <[email protected]> Committed: Wed Nov 1 15:38:28 2017 -0700 ---------------------------------------------------------------------- .../pom.xml | 6 + .../prototype/common/StatisticUtils.java | 3 - .../prototype/core/MetricSparkConsumer.java | 7 +- .../prototype/core/PointInTimeADSystem.java | 2 +- .../adservice/prototype/core/TrendADSystem.java | 2 +- .../prototype/methods/MetricAnomaly.java | 2 - .../prototype/methods/hsdev/HsdevTechnique.java | 5 +- .../prototype/methods/kstest/KSTechnique.java | 2 +- .../testing/utilities/MetricAnomalyTester.java | 17 -- .../src/main/resources/config.yml | 27 +++ .../adservice/app/AnomalyDetectionApp.scala | 1 + .../app/AnomalyDetectionAppConfig.scala | 53 +++++- .../app/AnomalyDetectionAppModule.scala | 1 + .../common/ADServiceConfiguration.scala | 74 -------- .../common/PhoenixQueryConstants.scala | 109 ----------- .../ambari/metrics/adservice/common/Range.scala | 44 +++++ .../metrics/adservice/common/Season.scala | 122 +++++++++++++ .../metrics/adservice/common/SeasonType.scala | 24 +++ .../metrics/adservice/common/TimeRange.scala | 39 ++++ .../configuration/AdServiceConfiguration.scala | 40 ++++ .../configuration/HBaseConfiguration.scala | 54 ++++++ .../MetricCollectorConfiguration.scala | 52 ++++++ .../MetricManagerServiceConfiguration.scala | 34 ++++ .../adservice/db/AdMetadataStoreAccessor.scala | 53 ++++++ .../adservice/db/AdMetadataStoreConstants.scala | 39 ++++ .../db/PhoenixAnomalyStoreAccessor.scala | 110 ++++++++++- .../adservice/db/PhoenixQueryConstants.scala | 108 +++++++++++ .../adservice/metadata/ADMetadataProvider.scala | 112 ++++++++++++ .../metadata/InputMetricDefinitionParser.scala | 52 ++++++ .../adservice/metadata/MetricDefinition.scala | 69 +++++++ .../metrics/adservice/metadata/MetricKey.scala | 50 +++++ .../metadata/MetricManagerService.scala | 64 +++++++ .../metadata/MetricManagerServiceImpl.scala | 183 +++++++++++++++++++ .../metadata/MetricMetadataProvider.scala | 31 ++++ .../metadata/MetricSourceDefinition.scala | 119 ++++++++++++ .../metadata/MetricSourceDefinitionType.scala | 26 +++ .../model/AnomalyDetectionMethod.scala | 23 +++ .../metrics/adservice/model/AnomalyType.scala | 26 +++ .../model/SingleMetricAnomalyInstance.scala | 29 +++ .../adservice/resource/AnomalyResource.scala | 4 +- .../resource/MetricDefinitionResource.scala | 28 +++ .../adservice/resource/RootResource.scala | 4 +- .../adservice/resource/SubsystemResource.scala | 26 +++ .../adservice/service/ADQueryService.scala | 12 ++ .../adservice/service/ADQueryServiceImpl.scala | 15 ++ .../spark/prototype/MetricAnomalyDetector.scala | 16 -- .../spark/prototype/SparkPhoenixReader.scala | 5 - .../PointInTimeAnomalyInstance.scala | 48 +++++ .../subsystem/trend/TrendAnomalyInstance.scala | 29 +++ .../metrics/adservice/prototype/TestTukeys.java | 2 +- .../app/AnomalyDetectionAppConfigTest.scala | 54 ++++++ .../app/DefaultADResourceSpecTest.scala | 2 +- .../common/ADManagerConfigurationTest.scala | 40 ---- .../metrics/adservice/common/RangeTest.scala | 37 ++++ .../metrics/adservice/common/SeasonTest.scala | 91 +++++++++ .../metadata/AMSMetadataProviderTest.scala | 43 +++++ .../metadata/MetricManagerServiceTest.scala | 130 +++++++++++++ .../metadata/MetricSourceDefinitionTest.scala | 72 ++++++++ .../sink/timeline/TimelineMetricKey.java | 59 ++++++ .../timeline/HBaseTimelineMetricsService.java | 68 ++++++- .../metrics/timeline/TimelineMetricStore.java | 10 +- .../TimelineMetricMetadataManager.java | 10 + .../webapp/TimelineWebServices.java | 44 ++++- .../timeline/TestTimelineMetricStore.java | 11 +- 64 files changed, 2369 insertions(+), 305 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml index e96e957..44bdc1f 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml @@ -446,5 +446,11 @@ <artifactId>metrics-core</artifactId> <version>3.2.5</version> </dependency> + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + <version>2.5</version> + <scope>test</scope> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java index 7f0aed3..0a22e50 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java @@ -18,10 +18,7 @@ package org.apache.ambari.metrics.adservice.prototype.common; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; public class StatisticUtils { http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java index e8257e5..addeda7 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java @@ -35,10 +35,15 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; -import java.util.*; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java index 0a2271a..f379605 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java @@ -17,8 +17,8 @@ */ package org.apache.ambari.metrics.adservice.prototype.core; -import org.apache.ambari.metrics.adservice.prototype.common.ResultSet; import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.adservice.prototype.common.ResultSet; import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaModel; import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique; import org.apache.commons.logging.Log; http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java index f5ec83a..80212b3 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java @@ -17,9 +17,9 @@ */ package org.apache.ambari.metrics.adservice.prototype.core; +import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; import org.apache.ambari.metrics.adservice.prototype.methods.hsdev.HsdevTechnique; -import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; import org.apache.ambari.metrics.adservice.prototype.methods.kstest.KSTechnique; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java index 251603b..60ff11c 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java @@ -18,8 +18,6 @@ package org.apache.ambari.metrics.adservice.prototype.methods; import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; public class MetricAnomaly implements Serializable{ http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java index 6facc99..855cc70 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java @@ -21,14 +21,15 @@ import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import static org.apache.ambari.metrics.adservice.prototype.common.StatisticUtils.median; -import static org.apache.ambari.metrics.adservice.prototype.common.StatisticUtils.sdev; import java.io.Serializable; import java.util.Date; import java.util.HashMap; import java.util.Map; +import static org.apache.ambari.metrics.adservice.prototype.common.StatisticUtils.median; +import static org.apache.ambari.metrics.adservice.prototype.common.StatisticUtils.sdev; + public class HsdevTechnique implements Serializable { private Map<String, Double> hsdevMap; http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java index 4727c6f..0dc679e 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java @@ -20,8 +20,8 @@ package org.apache.ambari.metrics.adservice.prototype.methods.kstest; import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; import org.apache.ambari.metrics.adservice.prototype.common.ResultSet; -import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker; +import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java index d079e66..10b3a71 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java @@ -18,23 +18,6 @@ package org.apache.ambari.metrics.adservice.prototype.testing.utilities; -import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; -import org.apache.ambari.metrics.adservice.prototype.common.ResultSet; -import org.apache.ambari.metrics.adservice.prototype.core.MetricsCollectorInterface; -import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; - /** * Class which was originally used to send test series from AMS to Spark through Kafka. */ http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml index bd88d57..6953745 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml @@ -1,3 +1,15 @@ +#Licensed under the Apache License, Version 2.0 (the "License"); +#you may not use this file except in compliance with the License. +#You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +#Unless required by applicable law or agreed to in writing, software +#distributed under the License is distributed on an "AS IS" BASIS, +#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +#See the License for the specific language governing permissions and +#limitations under the License. + server: applicationConnectors: - type: http @@ -7,3 +19,18 @@ server: logging: type: external + +metricManagerService: + inputDefinitionDirectory: /etc/adservice/conf/input-definitions-directory + +metricsCollector: + hostPortList: host1:6188,host2:6188 + metadataEndpoint: /v1/timeline/metrics/metadata/keys + +adQueryService: + anomalyDataTtl: 604800 + +#subsystemService: +# spark: +# pointInTime: +# trend: \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala index b7f217e..8b3a829 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala @@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature} import com.fasterxml.jackson.datatype.joda.JodaModule import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider import com.fasterxml.jackson.module.scala.DefaultScalaModule + import io.dropwizard.Application import io.dropwizard.setup.Environment http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala index 9e6cc6d..be8d027 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala @@ -1,7 +1,3 @@ -package org.apache.ambari.metrics.adservice.app - -import io.dropwizard.Configuration - /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,6 +15,55 @@ import io.dropwizard.Configuration * See the License for the specific language governing permissions and * limitations under the License. */ + +package org.apache.ambari.metrics.adservice.app + +import javax.validation.Valid + +import org.apache.ambari.metrics.adservice.configuration.{AdServiceConfiguration, HBaseConfiguration, MetricCollectorConfiguration, MetricManagerServiceConfiguration} + +import com.fasterxml.jackson.annotation.JsonProperty + +import io.dropwizard.Configuration + +/** + * Top Level AD System Manager config items. + */ class AnomalyDetectionAppConfig extends Configuration { + /* + Metric Definition Service configuration + */ + @Valid + private val metricManagerServiceConfiguration = new MetricManagerServiceConfiguration + + @Valid + private val metricCollectorConfiguration = new MetricCollectorConfiguration + + /* + Anomaly Service configuration + */ + @Valid + private val adServiceConfiguration = new AdServiceConfiguration + + /* + HBase Conf + */ + def getHBaseConf : org.apache.hadoop.conf.Configuration = { + HBaseConfiguration.getHBaseConf + } + + @JsonProperty("metricManagerService") + def getMetricManagerServiceConfiguration: MetricManagerServiceConfiguration = { + metricManagerServiceConfiguration + } + + @JsonProperty("adQueryService") + def getAdServiceConfiguration: AdServiceConfiguration = { + adServiceConfiguration + } + + @JsonProperty("metricsCollector") + def getMetricCollectorConfiguration: MetricCollectorConfiguration = metricCollectorConfiguration + } http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala index 338c97b..7425a7e 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala @@ -23,6 +23,7 @@ import org.apache.ambari.metrics.adservice.service.{ADQueryService, ADQueryServi import com.codahale.metrics.health.HealthCheck import com.google.inject.AbstractModule import com.google.inject.multibindings.Multibinder + import io.dropwizard.setup.Environment class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environment) extends AbstractModule { http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala deleted file mode 100644 index 248c74e..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.adservice.common - -import java.net.{MalformedURLException, URISyntaxException} - -import org.apache.hadoop.conf.Configuration - -object ADServiceConfiguration { - - private val AMS_AD_SITE_CONFIGURATION_FILE = "ams-ad-site.xml" - private val HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml" - - val ANOMALY_METRICS_TTL = "timeline.metrics.anomaly.data.ttl" - - private var hbaseConf: org.apache.hadoop.conf.Configuration = _ - private var adConf: org.apache.hadoop.conf.Configuration = _ - - def initConfigs(): Unit = { - - var classLoader: ClassLoader = Thread.currentThread.getContextClassLoader - if (classLoader == null) classLoader = getClass.getClassLoader - - try { - val hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE) - if (hbaseResUrl == null) throw new IllegalStateException("Unable to initialize the AD subsystem. No hbase-site present in the classpath.") - - hbaseConf = new Configuration(true) - hbaseConf.addResource(hbaseResUrl.toURI.toURL) - - val adSystemConfigUrl = classLoader.getResource(AMS_AD_SITE_CONFIGURATION_FILE) - if (adSystemConfigUrl == null) throw new IllegalStateException("Unable to initialize the AD subsystem. No ams-ad-site present in the classpath") - - adConf = new Configuration(true) - adConf.addResource(adSystemConfigUrl.toURI.toURL) - - } catch { - case me : MalformedURLException => println("MalformedURLException") - case ue : URISyntaxException => println("URISyntaxException") - } - } - - def getHBaseConf: org.apache.hadoop.conf.Configuration = { - hbaseConf - } - - def getAdConf: org.apache.hadoop.conf.Configuration = { - adConf - } - - def getAnomalyDataTtl: Int = { - if (adConf != null) return adConf.get(ANOMALY_METRICS_TTL, "604800").toInt - 604800 - } - - /** - * ttl - * - */ -} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala deleted file mode 100644 index 17173ec..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.metrics.adservice.common - -object PhoenixQueryConstants { - - ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - /* Table Name constants */ - ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - val METRIC_PROFILE_TABLE_NAME = "METRIC_PROFILE" - val METHOD_PARAMETERS_TABLE_NAME = "METHOD_PARAMETERS" - val PIT_ANOMALY_METRICS_TABLE_NAME = "PIT_METRIC_ANOMALIES" - val TREND_ANOMALY_METRICS_TABLE_NAME = "TREND_METRIC_ANOMALIES" - val MODEL_SNAPSHOT = "MODEL_SNAPSHOT" - - ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - /* CREATE statement constants */ - ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - val CREATE_METRIC_PROFILE_TABLE = "" - - val CREATE_METHOD_PARAMETERS_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" + - "METHOD_NAME VARCHAR, " + - "METHOD_TYPE VARCHAR, " + - "PARAMETERS VARCHAR " + - "CONSTRAINT pk PRIMARY KEY (METHOD_NAME)) " + - "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, COMPRESSION='SNAPPY'" - - val CREATE_PIT_ANOMALY_METRICS_TABLE_SQL: String = "CREATE TABLE IF NOT EXISTS %s (" + - "METRIC_UUID BINARY(20) NOT NULL, " + - "METHOD_NAME VARCHAR, " + - "ANOMALY_TIMESTAMP UNSIGNED_LONG NOT NULL, " + - "METRIC_VALUE DOUBLE, " + - "SEASONAL_INFO VARCHAR, " + - "ANOMALY_SCORE DOUBLE, " + - "MODEL_SNAPSHOT VARCHAR, " + - "DETECTION_TIME UNSIGNED_LONG " + - "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_TIMESTAMP)) " + - "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'" - - val CREATE_TREND_ANOMALY_METRICS_TABLE_SQL: String = "CREATE TABLE IF NOT EXISTS %s (" + - "METRIC_UUID BINARY(20) NOT NULL, " + - "ANOMALY_PERIOD_START UNSIGNED_LONG NOT NULL, " + - "ANOMALY_PERIOD_END UNSIGNED_LONG NOT NULL, " + - "TEST_PERIOD_START UNSIGNED_LONG NOT NULL, " + - "TEST_PERIOD_END UNSIGNED_LONG NOT NULL, " + - "METHOD_NAME VARCHAR, " + - "ANOMALY_SCORE DOUBLE, " + - "MODEL_SNAPSHOT VARCHAR, " + - "DETECTION_TIME UNSIGNED_LONG " + - "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, TEST_PERIOD_END)) " + - "DATA_BLOCK_ENCODING='FAST_DIFF' IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'" - - val CREATE_MODEL_SNAPSHOT_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" + - "METRIC_UUID BINARY(20), " + - "METHOD_NAME VARCHAR, " + - "METHOD_TYPE VARCHAR, " + - "PARAMETERS VARCHAR " + - "SNAPSHOT_TIME UNSIGNED LONG NOT NULL " + - "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME)) " + - "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, COMPRESSION='SNAPPY'" - - ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - /* UPSERT statement constants */ - ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - val UPSERT_METHOD_PARAMETERS_SQL: String = "UPSERT INTO %s (METHOD_NAME, METHOD_TYPE, PARAMETERS) VALUES (?,?,?)" - - val UPSERT_PIT_ANOMALY_METRICS_SQL: String = "UPSERT INTO %s (METRIC_UUID, ANOMALY_TIMESTAMP, METRIC_VALUE, METHOD_NAME, " + - "SEASONAL_INFO, ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" - - val UPSERT_TREND_ANOMALY_METRICS_SQL: String = "UPSERT INTO %s (METRIC_UUID, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, " + - "TEST_PERIOD_START, TEST_PERIOD_END, METHOD_NAME, ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" - - val UPSERT_MODEL_SNAPSHOT_SQL: String = "UPSERT INTO %s (METRIC_UUID, METHOD_NAME, METHOD_TYPE, PARAMETERS) VALUES (?, ?, ?, ?)" - - ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - /* GET statement constants */ - ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - val GET_METHOD_PAREMETERS_SQL: String = "SELECT METHOD_NAME, METHOD_TYPE, PARAMETERS FROM %s WHERE METHOD_NAME = %s" - - val GET_PIT_ANOMALY_METRIC_SQL: String = "SELECT METRIC_UUID, ANOMALY_TIMESTAMP, METRIC_VALUE, METHOD_NAME, SEASONAL_INFO, " + - "ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME FROM %s WHERE METRIC_METRIC_UUID = ? AND ANOMALY_TIMESTAMP > ? AND ANOMALY_TIMESTAMP <= ? " + - "ORDER BY ANOMALY_SCORE DESC" - - val GET_TREND_ANOMALY_METRIC_SQL: String = "SELECT METRIC_METRIC_UUID, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, " + - "ANOMALY_PERIOD_START, METHOD_NAME, ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME FROM %s WHERE METHOD = ? AND ANOMALY_PERIOD_END > ? " + - "AND TEST_END_TIME <= ? ORDER BY ANOMALY_SCORE DESC" - - val GET_MODEL_SNAPSHOT_SQL: String = "SELECT METRIC_UUID, METHOD_NAME, METHOD_TYPE, PARAMETERS FROM %s WHERE UUID = %s AND METHOD_NAME = %s" - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/Range.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/Range.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/Range.scala new file mode 100644 index 0000000..003c18f --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/Range.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.common + +/** + * Class to capture a Range in a Season. + * For example Monday - Wednesday is a 'Range' in a DAY Season. + * @param lower lower end + * @param higher higher end + */ +case class Range (lower: Int, higher: Int) { + + def withinRange(value: Int) : Boolean = { + if (lower <= higher) { + (value >= lower) && (value <= higher) + } else { + !(value > higher) && (value < lower) + } + } + + @Override + override def equals(obj: scala.Any): Boolean = { + if (obj == null) { + return false + } + val that : Range = obj.asInstanceOf[Range] + (lower == that.lower) && (higher == that.higher) + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/Season.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/Season.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/Season.scala new file mode 100644 index 0000000..aba2587 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/Season.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.common + +import java.time.DayOfWeek +import java.util.Calendar + +import javax.xml.bind.annotation.XmlRootElement + +import org.apache.ambari.metrics.adservice.common.SeasonType.SeasonType + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper + +/** + * Class to capture a 'Season' for a metric anomaly. + * A Season is a combination of DAY Range and HOUR Range. + * @param DAY Day Range + * @param HOUR Hour Range + */ +@XmlRootElement +case class Season(var DAY: Range, var HOUR: Range) { + + def belongsTo(timestamp : Long) : Boolean = { + val c = Calendar.getInstance + c.setTimeInMillis(timestamp) + val dayOfWeek = c.get(Calendar.DAY_OF_WEEK) + val hourOfDay = c.get(Calendar.HOUR_OF_DAY) + + if (DAY.lower != -1 && !DAY.withinRange(dayOfWeek)) + return false + if (HOUR.lower != -1 && !HOUR.withinRange(hourOfDay)) + return false + true + } + + @Override + override def equals(obj: scala.Any): Boolean = { + + if (obj == null) { + return false + } + + val that : Season = obj.asInstanceOf[Season] + DAY.equals(that.DAY) && HOUR.equals(that.HOUR) + } + + @Override + override def toString: String = { + + var prettyPrintString = "" + + var dLower: Int = DAY.lower - 1 + if (dLower == 0) { + dLower = 7 + } + + var dHigher: Int = DAY.higher - 1 + if (dHigher == 0) { + dHigher = 7 + } + + if (DAY != null) { + prettyPrintString = prettyPrintString.concat("DAY : [" + DayOfWeek.of(dLower) + "," + DayOfWeek.of(dHigher)) + "]" + } + + if (HOUR != null) { + prettyPrintString = prettyPrintString.concat(" HOUR : [" + HOUR.lower + "," + HOUR.higher) + "]" + } + prettyPrintString + } +} + +object Season { + + def apply(DAY: Range, HOUR: Range): Season = new Season(DAY, HOUR) + + def apply(range: Range, seasonType: SeasonType): Season = { + if (seasonType.equals(SeasonType.DAY)) { + new Season(range, Range(-1,-1)) + } else { + new Season(Range(-1,-1), range) + } + } + + val mapper = new ObjectMapper() with ScalaObjectMapper + mapper.registerModule(DefaultScalaModule) + + def getSeasons(timestamp: Long, seasons : List[Season]) : List[Season] = { + val validSeasons : scala.collection.mutable.MutableList[Season] = scala.collection.mutable.MutableList.empty[Season] + for ( season <- seasons ) { + if (season.belongsTo(timestamp)) { + validSeasons += season + } + } + validSeasons.toList + } + + def serialize(season: Season) : String = { + mapper.writeValueAsString(season) + } + + def deserialize(seasonString: String) : Season = { + mapper.readValue[Season](seasonString) + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/SeasonType.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/SeasonType.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/SeasonType.scala new file mode 100644 index 0000000..067972c --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/SeasonType.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.common + +object SeasonType extends Enumeration{ + + type SeasonType = Value + val DAY,HOUR = Value +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/TimeRange.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/TimeRange.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/TimeRange.scala new file mode 100644 index 0000000..50df658 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/TimeRange.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.common + +import java.util.Date + +/** + * A special form of a 'Range' class to denote Time range. + */ +case class TimeRange (startTime: Long, endTime: Long) { + @Override + override def toString: String = { + "StartTime=" + new Date(startTime) + ", EndTime=" + new Date(endTime) + } + + @Override + override def equals(obj: scala.Any): Boolean = { + if (obj == null) { + return false + } + val that : TimeRange = obj.asInstanceOf[TimeRange] + (startTime == that.startTime) && (endTime == that.endTime) + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/AdServiceConfiguration.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/AdServiceConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/AdServiceConfiguration.scala new file mode 100644 index 0000000..11e9f28 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/AdServiceConfiguration.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.configuration + +import javax.validation.constraints.NotNull + +import com.fasterxml.jackson.annotation.JsonProperty + +/** + * Class to get Anomaly Service specific configuration. + */ +class AdServiceConfiguration { + + @NotNull + var anomalyDataTtl: Long = _ + + @JsonProperty + def getAnomalyDataTtl: Long = anomalyDataTtl + + @JsonProperty + def setAnomalyDataTtl(anomalyDataTtl: Long): Unit = { + this.anomalyDataTtl = anomalyDataTtl + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala new file mode 100644 index 0000000..a7bbc66 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.configuration + +import java.net.{MalformedURLException, URISyntaxException} + +import org.apache.hadoop.conf.Configuration + +object HBaseConfiguration { + + val HBASE_SITE_CONFIGURATION_FILE: String = "hbase-site.xml" + val hbaseConf: org.apache.hadoop.conf.Configuration = new Configuration(true) + var isInitialized: Boolean = false + + def initConfigs(): Unit = { + if (!isInitialized) { + var classLoader: ClassLoader = Thread.currentThread.getContextClassLoader + if (classLoader == null) classLoader = getClass.getClassLoader + + try { + val hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE) + if (hbaseResUrl == null) throw new IllegalStateException("Unable to initialize the AD subsystem. No hbase-site present in the classpath.") + + hbaseConf.addResource(hbaseResUrl.toURI.toURL) + isInitialized = true + + } catch { + case me : MalformedURLException => println("MalformedURLException") + case ue : URISyntaxException => println("URISyntaxException") + } + } + } + + def getHBaseConf: org.apache.hadoop.conf.Configuration = { + if (!isInitialized) { + initConfigs() + } + hbaseConf + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricCollectorConfiguration.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricCollectorConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricCollectorConfiguration.scala new file mode 100644 index 0000000..50a0b72 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricCollectorConfiguration.scala @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.configuration + +import javax.validation.constraints.NotNull + +import com.fasterxml.jackson.annotation.JsonProperty + +/** + * Class to capture the Metrics Collector related configuration. + */ +class MetricCollectorConfiguration { + + @NotNull + private var hostPortList: String = _ + + @NotNull + private var metadataEndpoint: String = _ + + @JsonProperty + def getHostPortList: String = hostPortList + + @JsonProperty + def getMetadataEndpoint: String = metadataEndpoint + + @JsonProperty + def setHostPortList(hostPortList: String): Unit = { + this.hostPortList = hostPortList + } + + @JsonProperty + def setMetadataEndpoint(metadataEndpoint: String): Unit = { + this.metadataEndpoint = metadataEndpoint + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricManagerServiceConfiguration.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricManagerServiceConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricManagerServiceConfiguration.scala new file mode 100644 index 0000000..e5960d5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricManagerServiceConfiguration.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.configuration + +import javax.validation.constraints.NotNull + +import com.fasterxml.jackson.annotation.JsonProperty + +/** + * Class to capture the Metric Definition Service configuration. + */ +class MetricManagerServiceConfiguration { + + @NotNull + private val inputDefinitionDirectory: String = "" + + @JsonProperty + def getInputDefinitionDirectory: String = inputDefinitionDirectory +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessor.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessor.scala new file mode 100644 index 0000000..bcdb416 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessor.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.db + +import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinition + +/** + * Trait used to talk to the AD Metadata Store. + */ +trait AdMetadataStoreAccessor { + + /** + * Return all saved component definitions from DB. + * @return + */ + def getSavedInputDefinitions: List[MetricSourceDefinition] + + /** + * Save a set of component definitions + * @param metricSourceDefinitions Set of component definitions + * @return Success / Failure + */ + def saveInputDefinitions(metricSourceDefinitions: List[MetricSourceDefinition]) : Boolean + + /** + * Save a component definition + * @param metricSourceDefinition component definition + * @return Success / Failure + */ + def saveInputDefinition(metricSourceDefinition: MetricSourceDefinition) : Boolean + + /** + * Delete a component definition + * @param definitionName component definition + * @return + */ + def removeInputDefinition(definitionName: String) : Boolean +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreConstants.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreConstants.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreConstants.scala new file mode 100644 index 0000000..3d273a3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreConstants.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.db + +object AdMetadataStoreConstants { + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /* Table Name constants */ + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + val METRIC_PROFILE_TABLE_NAME = "METRIC_DEFINITION" + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /* CREATE statement constants */ + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + val CREATE_METRIC_DEFINITION_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" + + "DEFINITION_NAME VARCHAR, " + + "DEFINITION_JSON VARCHAR, " + + "DEFINITION_SOURCE NUMBER, " + + "CREATED_TIME TIMESTAMP, " + + "UPDATED_TIME TIMESTAMP " + + "CONSTRAINT pk PRIMARY KEY (DEFINITION_NAME))" +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala index 6f33e56..1191e90 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala @@ -17,23 +17,36 @@ package org.apache.ambari.metrics.adservice.db -import java.sql.{Connection, SQLException} +import java.sql.{Connection, PreparedStatement, ResultSet, SQLException} +import java.util.concurrent.TimeUnit.SECONDS -import org.apache.ambari.metrics.adservice.common.{ADServiceConfiguration, PhoenixQueryConstants} +import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig +import org.apache.ambari.metrics.adservice.common._ +import org.apache.ambari.metrics.adservice.configuration.HBaseConfiguration +import org.apache.ambari.metrics.adservice.metadata.MetricKey +import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod +import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType +import org.apache.ambari.metrics.adservice.model.{AnomalyDetectionMethod, AnomalyType, SingleMetricAnomalyInstance} +import org.apache.ambari.metrics.adservice.subsystem.pointintime.PointInTimeAnomalyInstance +import org.apache.ambari.metrics.adservice.subsystem.trend.TrendAnomalyInstance import org.apache.hadoop.hbase.util.RetryCounterFactory import org.apache.hadoop.metrics2.sink.timeline.query.{DefaultPhoenixDataSource, PhoenixConnectionProvider} -import java.util.concurrent.TimeUnit.SECONDS + +import com.google.inject.Inject object PhoenixAnomalyStoreAccessor { - private var datasource: PhoenixConnectionProvider = _ + @Inject + var configuration: AnomalyDetectionAppConfig = _ + + var datasource: PhoenixConnectionProvider = _ def initAnomalyMetricSchema(): Unit = { - val datasource: PhoenixConnectionProvider = new DefaultPhoenixDataSource(ADServiceConfiguration.getHBaseConf) + val datasource: PhoenixConnectionProvider = new DefaultPhoenixDataSource(HBaseConfiguration.getHBaseConf) val retryCounterFactory = new RetryCounterFactory(10, SECONDS.toMillis(3).toInt) - val ttl = ADServiceConfiguration.getAnomalyDataTtl + val ttl = configuration.getAdServiceConfiguration.getAnomalyDataTtl try { var conn = datasource.getConnectionRetryingOnException(retryCounterFactory) var stmt = conn.createStatement @@ -64,4 +77,89 @@ object PhoenixAnomalyStoreAccessor { @throws[SQLException] def getConnection: Connection = datasource.getConnection + + def getSingleMetricAnomalies(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int) : scala.collection.mutable.MutableList[SingleMetricAnomalyInstance] = { + val anomalies = scala.collection.mutable.MutableList.empty[SingleMetricAnomalyInstance] + val conn : Connection = getConnection + var stmt : PreparedStatement = null + var rs : ResultSet = null + val s : Season = Season(Range(-1,-1), SeasonType.DAY) + + try { + stmt = prepareAnomalyMetricsGetSqlStatement(conn, anomalyType, startTime, endTime, limit) + rs = stmt.executeQuery + if (anomalyType.equals(AnomalyType.POINT_IN_TIME)) { + while (rs.next()) { + val uuid: Array[Byte] = rs.getBytes("METRIC_UUID") + val timestamp: Long = rs.getLong("ANOMALY_TIMESTAMP") + val metricValue: Double = rs.getDouble("METRIC_VALUE") + val methodType: AnomalyDetectionMethod = AnomalyDetectionMethod.withName(rs.getString("METHOD_NAME")) + val season: Season = Season.deserialize(rs.getString("SEASONAL_INFO")) + val anomalyScore: Double = rs.getDouble("ANOMALY_SCORE") + val modelSnapshot: String = rs.getString("MODEL_PARAMETERS") + + val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid) + val anomalyInstance: SingleMetricAnomalyInstance = new PointInTimeAnomalyInstance(metricKey, timestamp, + metricValue, methodType, anomalyScore, season, modelSnapshot) + anomalies.+=(anomalyInstance) + } + } else { + while (rs.next()) { + val uuid: Array[Byte] = rs.getBytes("METRIC_UUID") + val anomalyStart: Long = rs.getLong("ANOMALY_PERIOD_START") + val anomalyEnd: Long = rs.getLong("ANOMALY_PERIOD_END") + val referenceStart: Long = rs.getLong("TEST_PERIOD_START") + val referenceEnd: Long = rs.getLong("TEST_PERIOD_END") + val methodType: AnomalyDetectionMethod = AnomalyDetectionMethod.withName(rs.getString("METHOD_NAME")) + val season: Season = Season.deserialize(rs.getString("SEASONAL_INFO")) + val anomalyScore: Double = rs.getDouble("ANOMALY_SCORE") + val modelSnapshot: String = rs.getString("MODEL_PARAMETERS") + + val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid) + val anomalyInstance: SingleMetricAnomalyInstance = TrendAnomalyInstance(metricKey, + TimeRange(anomalyStart, anomalyEnd), + TimeRange(referenceStart, referenceEnd), + methodType, anomalyScore, season, modelSnapshot) + anomalies.+=(anomalyInstance) + } + } + } catch { + case e: SQLException => throw e + } + + anomalies + } + + @throws[SQLException] + def prepareAnomalyMetricsGetSqlStatement(connection: Connection, anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): PreparedStatement = { + + val sb = new StringBuilder + + if (anomalyType.equals(AnomalyType.POINT_IN_TIME)) { + sb.++=(String.format(PhoenixQueryConstants.GET_PIT_ANOMALY_METRIC_SQL, PhoenixQueryConstants.PIT_ANOMALY_METRICS_TABLE_NAME)) + } else { + sb.++=(String.format(PhoenixQueryConstants.GET_TREND_ANOMALY_METRIC_SQL, PhoenixQueryConstants.TREND_ANOMALY_METRICS_TABLE_NAME)) + } + + sb.append(" LIMIT " + limit) + var stmt: java.sql.PreparedStatement = null + try { + stmt = connection.prepareStatement(sb.toString) + var pos = 1 + + pos += 1 + stmt.setLong(pos, startTime) + + stmt.setLong(pos, endTime) + + stmt.setFetchSize(limit) + + } catch { + case e: SQLException => + if (stmt != null) + stmt + throw e + } + stmt + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala new file mode 100644 index 0000000..5379c91 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.db + +object PhoenixQueryConstants { + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /* Table Name constants */ + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + val METRIC_PROFILE_TABLE_NAME = "METRIC_PROFILE" + val METHOD_PARAMETERS_TABLE_NAME = "METHOD_PARAMETERS" + val PIT_ANOMALY_METRICS_TABLE_NAME = "PIT_METRIC_ANOMALIES" + val TREND_ANOMALY_METRICS_TABLE_NAME = "TREND_METRIC_ANOMALIES" + val MODEL_SNAPSHOT = "MODEL_SNAPSHOT" + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /* CREATE statement constants */ + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + val CREATE_METHOD_PARAMETERS_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" + + "METHOD_NAME VARCHAR, " + + "METHOD_TYPE VARCHAR, " + + "PARAMETERS VARCHAR " + + "CONSTRAINT pk PRIMARY KEY (METHOD_NAME)) " + + "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, COMPRESSION='SNAPPY'" + + val CREATE_PIT_ANOMALY_METRICS_TABLE_SQL: String = "CREATE TABLE IF NOT EXISTS %s (" + + "METRIC_UUID BINARY(20) NOT NULL, " + + "METHOD_NAME VARCHAR, " + + "ANOMALY_TIMESTAMP UNSIGNED_LONG NOT NULL, " + + "METRIC_VALUE DOUBLE, " + + "SEASONAL_INFO VARCHAR, " + + "ANOMALY_SCORE DOUBLE, " + + "MODEL_PARAMETERS VARCHAR, " + + "DETECTION_TIME UNSIGNED_LONG " + + "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_TIMESTAMP)) " + + "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'" + + val CREATE_TREND_ANOMALY_METRICS_TABLE_SQL: String = "CREATE TABLE IF NOT EXISTS %s (" + + "METRIC_UUID BINARY(20) NOT NULL, " + + "ANOMALY_PERIOD_START UNSIGNED_LONG NOT NULL, " + + "ANOMALY_PERIOD_END UNSIGNED_LONG NOT NULL, " + + "TEST_PERIOD_START UNSIGNED_LONG NOT NULL, " + + "TEST_PERIOD_END UNSIGNED_LONG NOT NULL, " + + "METHOD_NAME VARCHAR, " + + "SEASONAL_INFO VARCHAR, " + + "ANOMALY_SCORE DOUBLE, " + + "MODEL_PARAMETERS VARCHAR, " + + "DETECTION_TIME UNSIGNED_LONG " + + "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, TEST_PERIOD_END)) " + + "DATA_BLOCK_ENCODING='FAST_DIFF' IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'" + + val CREATE_MODEL_SNAPSHOT_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" + + "METRIC_UUID BINARY(20), " + + "METHOD_NAME VARCHAR, " + + "METHOD_TYPE VARCHAR, " + + "PARAMETERS VARCHAR " + + "SNAPSHOT_TIME UNSIGNED LONG NOT NULL " + + "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME)) " + + "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, COMPRESSION='SNAPPY'" + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /* UPSERT statement constants */ + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + val UPSERT_METHOD_PARAMETERS_SQL: String = "UPSERT INTO %s (METHOD_NAME, METHOD_TYPE, PARAMETERS) VALUES (?,?,?)" + + val UPSERT_PIT_ANOMALY_METRICS_SQL: String = "UPSERT INTO %s (METRIC_UUID, ANOMALY_TIMESTAMP, METRIC_VALUE, METHOD_NAME, " + + "SEASONAL_INFO, ANOMALY_SCORE, MODEL_PARAMETERS, DETECTION_TIME) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" + + val UPSERT_TREND_ANOMALY_METRICS_SQL: String = "UPSERT INTO %s (METRIC_UUID, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, " + + "TEST_PERIOD_START, TEST_PERIOD_END, METHOD_NAME, ANOMALY_SCORE, MODEL_PARAMETERS, DETECTION_TIME) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" + + val UPSERT_MODEL_SNAPSHOT_SQL: String = "UPSERT INTO %s (METRIC_UUID, METHOD_NAME, METHOD_TYPE, PARAMETERS) VALUES (?, ?, ?, ?)" + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + /* GET statement constants */ + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + val GET_METHOD_PARAMETERS_SQL: String = "SELECT METHOD_NAME, METHOD_TYPE, PARAMETERS FROM %s WHERE METHOD_NAME = %s" + + val GET_PIT_ANOMALY_METRIC_SQL: String = "SELECT METRIC_UUID, ANOMALY_TIMESTAMP, METRIC_VALUE, METHOD_NAME, SEASONAL_INFO, " + + "ANOMALY_SCORE, MODEL_PARAMETERS, DETECTION_TIME FROM %s WHERE ANOMALY_TIMESTAMP > ? AND ANOMALY_TIMESTAMP <= ? " + + "ORDER BY ANOMALY_SCORE DESC" + + val GET_TREND_ANOMALY_METRIC_SQL: String = "SELECT METRIC_UUID, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, " + + "TEST_PERIOD_END, METHOD_NAME, SEASONAL_INFO, ANOMALY_SCORE, MODEL_PARAMETERS, DETECTION_TIME FROM %s WHERE ANOMALY_PERIOD_END > ? " + + "AND ANOMALY_PERIOD_END <= ? ORDER BY ANOMALY_SCORE DESC" + + val GET_MODEL_SNAPSHOT_SQL: String = "SELECT METRIC_UUID, METHOD_NAME, METHOD_TYPE, PARAMETERS FROM %s WHERE UUID = %s AND METHOD_NAME = %s" + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala new file mode 100644 index 0000000..801c5f5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.metadata + +import java.net.{HttpURLConnection, URL} + +import org.apache.ambari.metrics.adservice.configuration.MetricCollectorConfiguration +import org.apache.commons.lang.StringUtils +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper + +/** + * Class to invoke Metrics Collector metadata API. + * TODO : Instantiate a sync thread that regularly updates the internal maps by reading off AMS metadata. + */ +class ADMetadataProvider extends MetricMetadataProvider { + + var metricCollectorHostPorts: Array[String] = Array.empty[String] + var metricMetadataPath: String = "/v1/timeline/metrics/metadata/keys" + + val connectTimeout: Int = 10000 + val readTimeout: Int = 10000 + //TODO: Add retries for metrics collector GET call. + //val retries: Long = 5 + + def this(configuration: MetricCollectorConfiguration) { + this + if (StringUtils.isNotEmpty(configuration.getHostPortList)) { + metricCollectorHostPorts = configuration.getHostPortList.split(",") + } + metricMetadataPath = configuration.getMetadataEndpoint + } + + override def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): (Map[MetricDefinition, + Set[MetricKey]], Set[MetricKey]) = { + + val keysMap = scala.collection.mutable.Map[MetricDefinition, Set[MetricKey]]() + val numDefinitions: Int = metricSourceDefinition.metricDefinitions.size + val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey] + + for (metricDef <- metricSourceDefinition.metricDefinitions) { + for (hostPort <- metricCollectorHostPorts) { + val metricKeys: Set[MetricKey] = getKeysFromMetricsCollector(hostPort + metricMetadataPath, metricDef) + if (metricKeys != null) { + keysMap += (metricDef -> metricKeys) + metricKeySet.++(metricKeys) + } + } + } + (keysMap.toMap, metricKeySet.toSet) + } + + /** + * Make Metrics Collector REST API call to fetch keys. + * + * @param url + * @param metricDefinition + * @return + */ + def getKeysFromMetricsCollector(url: String, metricDefinition: MetricDefinition): Set[MetricKey] = { + + val mapper = new ObjectMapper() with ScalaObjectMapper + try { + val connection = new URL(url).openConnection.asInstanceOf[HttpURLConnection] + connection.setConnectTimeout(connectTimeout) + connection.setReadTimeout(readTimeout) + connection.setRequestMethod("GET") + val inputStream = connection.getInputStream + val content = scala.io.Source.fromInputStream(inputStream).mkString + if (inputStream != null) inputStream.close() + val metricKeySet: Set[MetricKey] = fromTimelineMetricKey(mapper.readValue[java.util.Set[TimelineMetricKey]](content)) + return metricKeySet + } catch { + case _: java.io.IOException | _: java.net.SocketTimeoutException => // handle this + } + null + } + + def fromTimelineMetricKey(timelineMetricKeys: java.util.Set[TimelineMetricKey]): Set[MetricKey] = { + val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey] + val iter = timelineMetricKeys.iterator() + while (iter.hasNext) { + val timelineMetricKey: TimelineMetricKey = iter.next() + val metricKey: MetricKey = MetricKey(timelineMetricKey.metricName, + timelineMetricKey.appId, + timelineMetricKey.instanceId, + timelineMetricKey.hostName, + timelineMetricKey.uuid) + + metricKeySet.add(metricKey) + } + metricKeySet.toSet + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala new file mode 100644 index 0000000..cc66c90 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.metadata + +import java.io.File + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper + +object InputMetricDefinitionParser { + + def parseInputDefinitionsFromDirectory(directory: String): List[MetricSourceDefinition] = { + + if (directory == null) { + return List.empty[MetricSourceDefinition] + } + val mapper = new ObjectMapper() with ScalaObjectMapper + + def metricSourceDefinitions: List[MetricSourceDefinition] = + for { + file <- getFilesInDirectory(directory) + definition: MetricSourceDefinition = mapper.readValue[MetricSourceDefinition](file) + if definition != null + } yield definition + + metricSourceDefinitions + } + + private def getFilesInDirectory(directory: String): List[File] = { + val dir = new File(directory) + if (dir.exists && dir.isDirectory) { + dir.listFiles.filter(_.isFile).toList + } else { + List[File]() + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala new file mode 100644 index 0000000..0a5e51f --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.metadata + +/* + { + "metric-name": "mem_free", + "metric-description" : "Free memory on a Host.", + "troubleshooting-info" : "Sudden drop / hike in free memory on a host.", + "static-threshold" : 10, + âapp-idâ : âHOSTâ +} + */ + +case class MetricDefinition (metricName: String, + appId: String, + hosts: List[String], + metricDescription: String, + troubleshootingInfo: String, + staticThreshold: Double) { + + @Override + override def equals(obj: scala.Any): Boolean = { + + if (obj == null || (getClass ne obj.getClass)) + return false + + val that = obj.asInstanceOf[MetricDefinition] + + if (!(metricName == that.metricName)) + return false + + if (!(appId == that.appId)) + return false + + true + } +} + +object MetricDefinition { + + def apply(metricName: String, + appId: String, + hosts: List[String], + metricDescription: String, + troubleshootingInfo: String, + staticThreshold: Double): MetricDefinition = + new MetricDefinition(metricName, appId, hosts, metricDescription, troubleshootingInfo, staticThreshold) + + def apply(metricName: String, appId: String, hosts: List[String]): MetricDefinition = + new MetricDefinition(metricName, appId, hosts, null, null, -1) + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a42cbc5f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala new file mode 100644 index 0000000..afad617 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.metadata + +case class MetricKey (metricName: String, appId: String, instanceId: String, hostname: String, uuid: Array[Byte]) { + + @Override + override def toString: String = { + "MetricName=" + metricName + ",App=" + appId + ",InstanceId=" + instanceId + ",Host=" + hostname + } + + @Override + override def equals(obj: scala.Any): Boolean = { + + if (obj == null || (getClass ne obj.getClass)) + return false + + val that = obj.asInstanceOf[MetricKey] + + if (!(metricName == that.metricName)) + return false + + if (!(appId == that.appId)) + return false + + if (!(instanceId == that.instanceId)) + return false + + if (!(hostname == that.hostname)) + return false + + true + } +}
