This is an automated email from the ASF dual-hosted git repository. avijayan pushed a commit to branch branch-feature-AMBARI-23212 in repository https://gitbox.apache.org/repos/asf/ambari.git
commit af6c91baf490bce52ae340c11158a2faa3fb8211 Author: Siddharth Wagle <swa...@hortonworks.com> AuthorDate: Fri Nov 3 10:06:12 2017 -0700 AMBARI-22365. Add storage support for storing metric definitions using LevelDB. (swagle) --- .../pom.xml | 19 +++- .../src/main/resources/config.yml | 8 ++ .../adservice/app/AnomalyDetectionAppConfig.scala | 11 ++- .../adservice/app/AnomalyDetectionAppModule.scala | 4 +- .../MetricDefinitionDBConfiguration.scala | 38 ++++++++ .../metrics/adservice/db/MetadataDatasource.scala | 73 +++++++++++++++ .../adservice/leveldb/LevelDBDatasource.scala | 102 +++++++++++++++++++++ .../adservice/leveldb/LevelDBDataSourceTest.scala | 57 ++++++++++++ 8 files changed, 306 insertions(+), 6 deletions(-) diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml index 44bdc1f..cfa8124 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml @@ -424,11 +424,18 @@ <artifactId>jackson-datatype-jdk8</artifactId> <version>${jackson.version}</version> </dependency> + <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <version>${jackson.version}</version> + <groupId>org.fusesource.leveldbjni</groupId> + <artifactId>leveldbjni-all</artifactId> + <version>1.8</version> + </dependency> + <dependency> + <groupId>org.iq80.leveldb</groupId> + <artifactId>leveldb</artifactId> + <version>0.9</version> </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> @@ -452,5 +459,11 @@ <version>2.5</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.8.4</version> + <scope>test</scope> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml index 920c50c..299a472 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml @@ -30,6 +30,14 @@ metricsCollector: adQueryService: anomalyDataTtl: 604800 +metricDefinitionDB: + # force checksum verification of all data that is read from the file system on behalf of a particular read + verifyChecksums: true + # raise an error as soon as it detects an internal corruption + performParanoidChecks: false + # Path to Level DB directory + dbDirPath: /var/lib/ambari-metrics-anomaly-detection/ + #subsystemService: # spark: # pointInTime: diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala index c1ef0d1..aa20223 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala @@ -20,10 +20,9 @@ package org.apache.ambari.metrics.adservice.app import javax.validation.Valid -import org.apache.ambari.metrics.adservice.configuration.{AdServiceConfiguration, HBaseConfiguration, MetricCollectorConfiguration, MetricDefinitionServiceConfiguration} +import org.apache.ambari.metrics.adservice.configuration._ import com.fasterxml.jackson.annotation.JsonProperty - import io.dropwizard.Configuration /** @@ -46,6 +45,12 @@ class AnomalyDetectionAppConfig extends Configuration { @Valid private val adServiceConfiguration = new AdServiceConfiguration + /** + * LevelDB settings for metrics definitions + */ + @Valid + private val metricDefinitionDBConfiguration = new MetricDefinitionDBConfiguration + /* HBase Conf */ @@ -66,4 +71,6 @@ class AnomalyDetectionAppConfig extends Configuration { @JsonProperty("metricsCollector") def getMetricCollectorConfiguration: MetricCollectorConfiguration = metricCollectorConfiguration + @JsonProperty("metricDefinitionDB") + def getMetricDefinitionDBConfiguration: MetricDefinitionDBConfiguration = metricDefinitionDBConfiguration } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala index 7425a7e..28b2880 100644 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala @@ -17,13 +17,14 @@ */ package org.apache.ambari.metrics.adservice.app +import org.apache.ambari.metrics.adservice.db.MetadataDatasource +import org.apache.ambari.metrics.adservice.leveldb.LevelDBDataSource import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, RootResource} import org.apache.ambari.metrics.adservice.service.{ADQueryService, ADQueryServiceImpl} import com.codahale.metrics.health.HealthCheck import com.google.inject.AbstractModule import com.google.inject.multibindings.Multibinder - import io.dropwizard.setup.Environment class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environment) extends AbstractModule { @@ -35,5 +36,6 @@ class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environm bind(classOf[AnomalyResource]) bind(classOf[RootResource]) bind(classOf[ADQueryService]).to(classOf[ADQueryServiceImpl]) + bind(classOf[MetadataDatasource]).to(classOf[LevelDBDataSource]) } } diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala new file mode 100644 index 0000000..79a350c --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.configuration + +import javax.validation.constraints.NotNull + +import com.fasterxml.jackson.annotation.JsonProperty + +class MetricDefinitionDBConfiguration { + + @NotNull + private var dbDirPath: String = _ + + @JsonProperty("verifyChecksums") + def verifyChecksums: Boolean = true + + @JsonProperty("performParanoidChecks") + def performParanoidChecks: Boolean = false + + @JsonProperty("dbDirPath") + def getDbDirPath: String = dbDirPath +} diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala new file mode 100644 index 0000000..aa6694a --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.db + +trait MetadataDatasource { + + type Key = Array[Byte] + type Value = Array[Byte] + + /** + * Idempotent call at the start of the application to initialize db + */ + def initialize(): Unit + + /** + * This function obtains the associated value to a key. It requires the (key-value) pair to be in the DataSource + * + * @param key + * @return the value associated with the passed key. + */ + def apply(key: Key): Value = get(key).get + + /** + * This function obtains the associated value to a key, if there exists one. + * + * @param key + * @return the value associated with the passed key. + */ + def get(key: Key): Option[Value] + + + /** + * This function associates a key to a value, overwriting if necessary + */ + def put(key: Key, value: Value): Unit + + /** + * Delete key from the db + */ + def delete(key: Key): Unit + + /** + * This function updates the DataSource by deleting, updating and inserting new (key-value) pairs. + * + * @param toRemove which includes all the keys to be removed from the DataSource. + * @param toUpsert which includes all the (key-value) pairs to be inserted into the DataSource. + * If a key is already in the DataSource its value will be updated. + * @return the new DataSource after the removals and insertions were done. + */ + def update(toRemove: Seq[Key], toUpsert: Seq[(Key, Value)]): Unit + + /** + * This function closes the DataSource, without deleting the files used by it. + */ + def close(): Unit + +} diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala new file mode 100644 index 0000000..6d185bf --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.leveldb + +import java.io.File + +import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig +import org.apache.ambari.metrics.adservice.configuration.MetricDefinitionDBConfiguration +import org.apache.ambari.metrics.adservice.db.MetadataDatasource +import org.iq80.leveldb.{DB, Options, WriteOptions} +import org.iq80.leveldb.impl.Iq80DBFactory + +import com.google.inject.Singleton + +@Singleton +class LevelDBDataSource(appConfig: AnomalyDetectionAppConfig) extends MetadataDatasource { + + private var db: DB = _ + @volatile var isInitialized: Boolean = false + + override def initialize(): Unit = { + if (isInitialized) return + + val configuration: MetricDefinitionDBConfiguration = appConfig.getMetricDefinitionDBConfiguration + + db = createDB(new LevelDbConfig { + override val createIfMissing: Boolean = true + override val verifyChecksums: Boolean = configuration.verifyChecksums + override val paranoidChecks: Boolean = configuration.performParanoidChecks + override val path: String = configuration.getDbDirPath + }) + isInitialized = true + } + + private def createDB(levelDbConfig: LevelDbConfig): DB = { + import levelDbConfig._ + + val options = new Options() + .createIfMissing(createIfMissing) + .paranoidChecks(paranoidChecks) // raise an error as soon as it detects an internal corruption + .verifyChecksums(verifyChecksums) // force checksum verification of all data that is read from the file system on behalf of a particular read + + Iq80DBFactory.factory.open(new File(path), options) + } + + override def close(): Unit = { + db.close() + } + + /** + * This function obtains the associated value to a key, if there exists one. + * + * @param key + * @return the value associated with the passed key. + */ + override def get(key: Key): Option[Value] = Option(db.get(key)) + + /** + * This function updates the DataSource by deleting, updating and inserting new (key-value) pairs. + * + * @param toRemove which includes all the keys to be removed from the DataSource. + * @param toUpsert which includes all the (key-value) pairs to be inserted into the DataSource. + * If a key is already in the DataSource its value will be updated. + */ + override def update(toRemove: Seq[Key], toUpsert: Seq[(Key, Value)]): Unit = { + val batch = db.createWriteBatch() + toRemove.foreach { key => batch.delete(key) } + toUpsert.foreach { item => batch.put(item._1, item._2) } + db.write(batch, new WriteOptions()) + } + + override def put(key: Key, value: Value): Unit = { + db.put(key, value) + } + + override def delete(key: Key): Unit = { + db.delete(key) + } +} + +trait LevelDbConfig { + val createIfMissing: Boolean + val paranoidChecks: Boolean + val verifyChecksums: Boolean + val path: String +} \ No newline at end of file diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala new file mode 100644 index 0000000..2ddb7b8 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.leveldb + +import java.io.File + +import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig +import org.apache.ambari.metrics.adservice.configuration.MetricDefinitionDBConfiguration +import org.iq80.leveldb.util.FileUtils +import org.mockito.Mockito.when +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +import org.scalatest.mockito.MockitoSugar + +class LevelDBDataSourceTest extends FunSuite with BeforeAndAfter with Matchers with MockitoSugar { + + var db: LevelDBDataSource = _ + var file : File = FileUtils.createTempDir("adservice-leveldb-test") + + before { + val appConfig: AnomalyDetectionAppConfig = mock[AnomalyDetectionAppConfig] + val mdConfig : MetricDefinitionDBConfiguration = mock[MetricDefinitionDBConfiguration] + + when(appConfig.getMetricDefinitionDBConfiguration).thenReturn(mdConfig) + when(mdConfig.verifyChecksums).thenReturn(true) + when(mdConfig.performParanoidChecks).thenReturn(false) + when(mdConfig.getDbDirPath).thenReturn(file.getAbsolutePath) + + db = new LevelDBDataSource(appConfig) + db.initialize() + } + + test("testOperations") { + db.put("Hello".getBytes(), "World".getBytes()) + assert(db.get("Hello".getBytes()).get.sameElements("World".getBytes())) + db.update(Seq("Hello".getBytes()), Seq(("Hello".getBytes(), "Mars".getBytes()))) + assert(db.get("Hello".getBytes()).get.sameElements("Mars".getBytes())) + } + + after { + FileUtils.deleteRecursively(file) + } +} -- To stop receiving notification emails like this one, please contact avija...@apache.org.