This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new a92d14c  Enable publishing metrics to Prometheus. (#4227)
a92d14c is described below

commit a92d14c3cd685ec8c9caa530606a82bb9aed9405
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Tue Feb 12 07:38:35 2019 -0800

    Enable publishing metrics to Prometheus. (#4227)
---
 ansible/roles/nginx/templates/nginx.conf.j2        |   4 +
 common/scala/build.gradle                          |   3 +
 common/scala/src/main/resources/application.conf   |  23 ++++
 .../org/apache/openwhisk/common/Logging.scala      | 152 ++++++++++++---------
 .../org/apache/openwhisk/common/Prometheus.scala   |  51 +++++++
 .../apache/openwhisk/common/TransactionId.scala    |  11 +-
 .../org/apache/openwhisk/core/WhiskConfig.scala    |   1 +
 .../database/cosmosdb/CosmosDBArtifactStore.scala  |   5 +-
 .../apache/openwhisk/http/BasicHttpService.scala   |   3 +-
 .../apache/openwhisk/http/BasicRasService.scala    |   6 +-
 .../apache/openwhisk/common/PrometheusTests.scala  |  86 ++++++++++++
 11 files changed, 272 insertions(+), 73 deletions(-)

diff --git a/ansible/roles/nginx/templates/nginx.conf.j2 
b/ansible/roles/nginx/templates/nginx.conf.j2
index cefea41..d05f779 100644
--- a/ansible/roles/nginx/templates/nginx.conf.j2
+++ b/ansible/roles/nginx/templates/nginx.conf.j2
@@ -148,6 +148,10 @@ http {
             rewrite /cli/go/download(.*) /cli$1 permanent;
         }
 
+        location /metrics {
+            deny all;
+        }
+
 {% if nginx.htmldir %}
         location /ui {
             alias /usr/share/nginx/html;
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 642816a..7d14644 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -71,6 +71,9 @@ dependencies {
     compile ('io.kamon:kamon-system-metrics_2.12:1.0.0') {
         exclude group: 'io.kamon', module: 'sigar-loader'
     }
+    compile ('io.kamon:kamon-prometheus_2.12:1.1.1'){
+        exclude group: 'org.nanohttpd'
+    }
     //for mesos
     compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.17'
 
diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index 857151e..cf8a3ee 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -50,6 +50,10 @@ kamon {
         # disable the host metrics as we are only interested in JVM metrics
         host.enabled = false
     }
+    prometheus {
+        # We expose the metrics endpoint over akka http. So default server is 
disabled
+        start-embedded-http-server = no
+    }
 
     reporters = [
         "kamon.statsd.StatsDReporter"
@@ -57,6 +61,25 @@ kamon {
 }
 
 whisk {
+    metrics {
+        # Enable/disable Prometheus support. If enabled then metrics would be 
exposed at `/metrics` endpoint
+        # If Prometheus is enabled then please review 
`kamon.metric.tick-interval` (set to 1 sec by default above).
+        # It can then be set to scrape interval value which is generally 60 
secs
+        prometheus-enabled = false
+
+        # Enable/disable whether metric information is sent to the configured 
reporters.
+        kamon-enabled      = false
+        kamon-enabled      = ${?METRICS_KAMON}
+
+        # Enable/disable whether to use the Kamon tags when sending metrics.
+        kamon-tags-enabled = false
+        kamon-tags-enabled = ${?METRICS_KAMON_TAGS}
+
+        # Enable/disable whether the metric information is written out to the 
log files in logmarker format.
+        logs-enabled       = true
+        logs-enabled       = ${?METRICS_LOG}
+    }
+
     # kafka related configuration, the complete list of parameters is here:
     # https://kafka.apache.org/documentation/#brokerconfigs
     kafka {
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 1eb0083..e5ff182 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -24,7 +24,7 @@ import java.time.format.DateTimeFormatter
 import akka.event.Logging._
 import akka.event.LoggingAdapter
 import kamon.Kamon
-import kamon.metric.{Counter => KCounter, Histogram => KHistogram}
+import kamon.metric.{MeasurementUnit, Counter => KCounter, Histogram => 
KHistogram}
 import kamon.statsd.{MetricKeyGenerator, SimpleMetricKeyGenerator}
 import kamon.system.SystemMetrics
 import org.apache.openwhisk.core.entity.ControllerInstanceId
@@ -181,11 +181,12 @@ private object Emitter {
  * @param subAction more specific identifier for "action", like `runc.resume`
  * @param tags tags can be used for whatever granularity you might need.
  */
-case class LogMarkerToken(component: String,
-                          action: String,
-                          state: String,
-                          subAction: Option[String] = None,
-                          tags: Map[String, String] = Map.empty) {
+case class LogMarkerToken(
+  component: String,
+  action: String,
+  state: String,
+  subAction: Option[String] = None,
+  tags: Map[String, String] = Map.empty)(measurementUnit: MeasurementUnit = 
MeasurementUnit.none) {
   private var finishToken: LogMarkerToken = _
   private var errorToken: LogMarkerToken = _
 
@@ -200,14 +201,14 @@ case class LogMarkerToken(component: String,
 
   def asFinish: LogMarkerToken = {
     if (finishToken == null) {
-      finishToken = copy(state = LoggingMarkers.finish)
+      finishToken = copy(state = LoggingMarkers.finish)(measurementUnit)
     }
     finishToken
   }
 
   def asError: LogMarkerToken = {
     if (errorToken == null) {
-      errorToken = copy(state = LoggingMarkers.error)
+      errorToken = copy(state = LoggingMarkers.error)(measurementUnit)
     }
     errorToken
   }
@@ -239,10 +240,10 @@ case class LogMarkerToken(component: String,
   private def createHistogram() = {
     if (TransactionId.metricsKamonTags) {
       Kamon
-        .histogram(createName(toString, "histogram"))
+        .histogram(createName(toString, "histogram"), measurementUnit)
         .refine(tags)
     } else {
-      Kamon.histogram(createName(toStringWithSubAction, "histogram"))
+      Kamon.histogram(createName(toStringWithSubAction, "histogram"), 
measurementUnit)
     }
   }
 
@@ -269,7 +270,7 @@ object LogMarkerToken {
       case a :: s :: _ => (a, Some(s))
     }
 
-    LogMarkerToken(component, generalAction, state, subAction)
+    LogMarkerToken(component, generalAction, state, 
subAction)(MeasurementUnit.none)
   }
 
 }
@@ -327,93 +328,118 @@ object LoggingMarkers {
   /*
    * Controller related markers
    */
-  def CONTROLLER_STARTUP(id: String) = LogMarkerToken(controller, 
s"startup$id", count)
+  def CONTROLLER_STARTUP(id: String) = LogMarkerToken(controller, 
s"startup$id", count)(MeasurementUnit.none)
 
   // Time of the activation in controller until it is delivered to Kafka
-  val CONTROLLER_ACTIVATION = LogMarkerToken(controller, activation, start)
-  val CONTROLLER_ACTIVATION_BLOCKING = LogMarkerToken(controller, 
"blockingActivation", start)
+  val CONTROLLER_ACTIVATION =
+    LogMarkerToken(controller, activation, 
start)(MeasurementUnit.time.milliseconds)
+  val CONTROLLER_ACTIVATION_BLOCKING =
+    LogMarkerToken(controller, "blockingActivation", 
start)(MeasurementUnit.time.milliseconds)
   val CONTROLLER_ACTIVATION_BLOCKING_DATABASE_RETRIEVAL =
-    LogMarkerToken(controller, "blockingActivationDatabaseRetrieval", count)
+    LogMarkerToken(controller, "blockingActivationDatabaseRetrieval", 
count)(MeasurementUnit.none)
 
   // Time that is needed to load balance the activation
-  val CONTROLLER_LOADBALANCER = LogMarkerToken(controller, loadbalancer, start)
+  val CONTROLLER_LOADBALANCER = LogMarkerToken(controller, loadbalancer, 
start)(MeasurementUnit.none)
 
   // Time that is needed to produce message in kafka
-  val CONTROLLER_KAFKA = LogMarkerToken(controller, kafka, start)
+  val CONTROLLER_KAFKA = LogMarkerToken(controller, kafka, 
start)(MeasurementUnit.time.milliseconds)
 
   // System overload and random invoker assignment
-  val MANAGED_SYSTEM_OVERLOAD = LogMarkerToken(controller, 
"managedInvokerSystemOverload", count)
-  val BLACKBOX_SYSTEM_OVERLOAD = LogMarkerToken(controller, 
"blackBoxInvokerSystemOverload", count)
+  val MANAGED_SYSTEM_OVERLOAD = LogMarkerToken(controller, 
"managedInvokerSystemOverload", count)(MeasurementUnit.none)
+  val BLACKBOX_SYSTEM_OVERLOAD =
+    LogMarkerToken(controller, "blackBoxInvokerSystemOverload", 
count)(MeasurementUnit.none)
   /*
    * Invoker related markers
    */
-  def INVOKER_STARTUP(i: Int) = LogMarkerToken(invoker, s"startup$i", count)
+  def INVOKER_STARTUP(i: Int) = LogMarkerToken(invoker, s"startup$i", 
count)(MeasurementUnit.none)
 
   // Check invoker healthy state from loadbalancer
   def LOADBALANCER_INVOKER_STATUS_CHANGE(state: String) =
-    LogMarkerToken(loadbalancer, "invokerState", count, Some(state))
-  val LOADBALANCER_ACTIVATION_START = LogMarkerToken(loadbalancer, 
"activations", count)
+    LogMarkerToken(loadbalancer, "invokerState", count, 
Some(state))(MeasurementUnit.none)
+  val LOADBALANCER_ACTIVATION_START = LogMarkerToken(loadbalancer, 
"activations", count)(MeasurementUnit.none)
 
   def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance: 
ControllerInstanceId) =
-    LogMarkerToken(loadbalancer + controllerInstance.asString, 
"activationsInflight", count)
+    LogMarkerToken(loadbalancer + controllerInstance.asString, 
"activationsInflight", count)(MeasurementUnit.none)
   def LOADBALANCER_MEMORY_INFLIGHT(controllerInstance: ControllerInstanceId, 
actionType: String) =
-    LogMarkerToken(loadbalancer + controllerInstance.asString, 
s"memory${actionType}Inflight", count)
+    LogMarkerToken(loadbalancer + controllerInstance.asString, 
s"memory${actionType}Inflight", count)(
+      MeasurementUnit.none)
 
   // Time that is needed to execute the action
-  val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start)
+  val INVOKER_ACTIVATION_RUN =
+    LogMarkerToken(invoker, "activationRun", 
start)(MeasurementUnit.time.milliseconds)
 
   // Time that is needed to init the action
-  val INVOKER_ACTIVATION_INIT = LogMarkerToken(invoker, "activationInit", 
start)
+  val INVOKER_ACTIVATION_INIT =
+    LogMarkerToken(invoker, "activationInit", 
start)(MeasurementUnit.time.milliseconds)
 
   // Time needed to collect the logs
-  val INVOKER_COLLECT_LOGS = LogMarkerToken(invoker, "collectLogs", start)
+  val INVOKER_COLLECT_LOGS =
+    LogMarkerToken(invoker, "collectLogs", 
start)(MeasurementUnit.time.milliseconds)
 
   // Time in invoker
-  val INVOKER_ACTIVATION = LogMarkerToken(invoker, activation, start)
-  def INVOKER_DOCKER_CMD(cmd: String) = LogMarkerToken(invoker, "docker", 
start, Some(cmd), Map("cmd" -> cmd))
+  val INVOKER_ACTIVATION = LogMarkerToken(invoker, activation, 
start)(MeasurementUnit.none)
+  def INVOKER_DOCKER_CMD(cmd: String) =
+    LogMarkerToken(invoker, "docker", start, Some(cmd), Map("cmd" -> 
cmd))(MeasurementUnit.time.milliseconds)
   def INVOKER_DOCKER_CMD_TIMEOUT(cmd: String) =
-    LogMarkerToken(invoker, "docker", timeout, Some(cmd), Map("cmd" -> cmd))
-  def INVOKER_RUNC_CMD(cmd: String) = LogMarkerToken(invoker, "runc", start, 
Some(cmd), Map("cmd" -> cmd))
-  def INVOKER_KUBECTL_CMD(cmd: String) = LogMarkerToken(invoker, "kubectl", 
start, Some(cmd), Map("cmd" -> cmd))
+    LogMarkerToken(invoker, "docker", timeout, Some(cmd), Map("cmd" -> 
cmd))(MeasurementUnit.none)
+  def INVOKER_RUNC_CMD(cmd: String) =
+    LogMarkerToken(invoker, "runc", start, Some(cmd), Map("cmd" -> 
cmd))(MeasurementUnit.time.milliseconds)
+  def INVOKER_KUBECTL_CMD(cmd: String) =
+    LogMarkerToken(invoker, "kubectl", start, Some(cmd), Map("cmd" -> 
cmd))(MeasurementUnit.none)
   def INVOKER_MESOS_CMD(cmd: String) =
-    LogMarkerToken(invoker, "mesos", start, Some(cmd), Map("cmd" -> cmd))
+    LogMarkerToken(invoker, "mesos", start, Some(cmd), Map("cmd" -> 
cmd))(MeasurementUnit.time.milliseconds)
   def INVOKER_MESOS_CMD_TIMEOUT(cmd: String) =
-    LogMarkerToken(invoker, "mesos", timeout, Some(cmd), Map("cmd" -> cmd))
+    LogMarkerToken(invoker, "mesos", timeout, Some(cmd), Map("cmd" -> 
cmd))(MeasurementUnit.none)
   def INVOKER_CONTAINER_START(containerState: String) =
-    LogMarkerToken(invoker, "containerStart", count, Some(containerState), 
Map("containerState" -> containerState))
+    LogMarkerToken(invoker, "containerStart", count, Some(containerState), 
Map("containerState" -> containerState))(
+      MeasurementUnit.none)
   val CONTAINER_CLIENT_RETRIES =
-    LogMarkerToken(containerClient, "retries", count)
-
-  val INVOKER_TOTALMEM_BLACKBOX = LogMarkerToken(loadbalancer, 
"totalCapacityBlackBox", count)
-  val INVOKER_TOTALMEM_MANAGED = LogMarkerToken(loadbalancer, 
"totalCapacityManaged", count)
-
-  val HEALTHY_INVOKER_MANAGED = LogMarkerToken(loadbalancer, 
"totalHealthyInvokerManaged", count)
-  val UNHEALTHY_INVOKER_MANAGED = LogMarkerToken(loadbalancer, 
"totalUnhealthyInvokerManaged", count)
-  val UNRESPONSIVE_INVOKER_MANAGED = LogMarkerToken(loadbalancer, 
"totalUnresponsiveInvokerManaged", count)
-  val OFFLINE_INVOKER_MANAGED = LogMarkerToken(loadbalancer, 
"totalOfflineInvokerManaged", count)
-
-  val HEALTHY_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer, 
"totalHealthyInvokerBlackBox", count)
-  val UNHEALTHY_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer, 
"totalUnhealthyInvokerBlackBox", count)
-  val UNRESPONSIVE_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer, 
"totalUnresponsiveInvokerBlackBox", count)
-  val OFFLINE_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer, 
"totalOfflineInvokerBlackBox", count)
+    LogMarkerToken(containerClient, "retries", count)(MeasurementUnit.none)
+
+  val INVOKER_TOTALMEM_BLACKBOX = LogMarkerToken(loadbalancer, 
"totalCapacityBlackBox", count)(MeasurementUnit.none)
+  val INVOKER_TOTALMEM_MANAGED = LogMarkerToken(loadbalancer, 
"totalCapacityManaged", count)(MeasurementUnit.none)
+
+  val HEALTHY_INVOKER_MANAGED = LogMarkerToken(loadbalancer, 
"totalHealthyInvokerManaged", count)(MeasurementUnit.none)
+  val UNHEALTHY_INVOKER_MANAGED =
+    LogMarkerToken(loadbalancer, "totalUnhealthyInvokerManaged", 
count)(MeasurementUnit.none)
+  val UNRESPONSIVE_INVOKER_MANAGED =
+    LogMarkerToken(loadbalancer, "totalUnresponsiveInvokerManaged", 
count)(MeasurementUnit.none)
+  val OFFLINE_INVOKER_MANAGED = LogMarkerToken(loadbalancer, 
"totalOfflineInvokerManaged", count)(MeasurementUnit.none)
+
+  val HEALTHY_INVOKER_BLACKBOX =
+    LogMarkerToken(loadbalancer, "totalHealthyInvokerBlackBox", 
count)(MeasurementUnit.none)
+  val UNHEALTHY_INVOKER_BLACKBOX =
+    LogMarkerToken(loadbalancer, "totalUnhealthyInvokerBlackBox", 
count)(MeasurementUnit.none)
+  val UNRESPONSIVE_INVOKER_BLACKBOX =
+    LogMarkerToken(loadbalancer, "totalUnresponsiveInvokerBlackBox", 
count)(MeasurementUnit.none)
+  val OFFLINE_INVOKER_BLACKBOX =
+    LogMarkerToken(loadbalancer, "totalOfflineInvokerBlackBox", 
count)(MeasurementUnit.none)
 
   // Kafka related markers
-  def KAFKA_QUEUE(topic: String) = LogMarkerToken(kafka, topic, count)
-  def KAFKA_MESSAGE_DELAY(topic: String) = LogMarkerToken(kafka, topic, start, 
Some("delay"))
+  def KAFKA_QUEUE(topic: String) = LogMarkerToken(kafka, topic, 
count)(MeasurementUnit.none)
+  def KAFKA_MESSAGE_DELAY(topic: String) =
+    LogMarkerToken(kafka, topic, start, 
Some("delay"))(MeasurementUnit.time.milliseconds)
 
   /*
    * General markers
    */
-  val DATABASE_CACHE_HIT = LogMarkerToken(database, "cacheHit", count)
-  val DATABASE_CACHE_MISS = LogMarkerToken(database, "cacheMiss", count)
-  val DATABASE_SAVE = LogMarkerToken(database, "saveDocument", start)
-  val DATABASE_BULK_SAVE = LogMarkerToken(database, "saveDocumentBulk", start)
-  val DATABASE_DELETE = LogMarkerToken(database, "deleteDocument", start)
-  val DATABASE_GET = LogMarkerToken(database, "getDocument", start)
-  val DATABASE_QUERY = LogMarkerToken(database, "queryView", start)
-  val DATABASE_ATT_GET = LogMarkerToken(database, "getDocumentAttachment", 
start)
-  val DATABASE_ATT_SAVE = LogMarkerToken(database, "saveDocumentAttachment", 
start)
-  val DATABASE_ATT_DELETE = LogMarkerToken(database, 
"deleteDocumentAttachment", start)
-  val DATABASE_ATTS_DELETE = LogMarkerToken(database, 
"deleteDocumentAttachments", start)
-  val DATABASE_BATCH_SIZE = LogMarkerToken(database, "batchSize", count)
+  val DATABASE_CACHE_HIT = LogMarkerToken(database, "cacheHit", 
count)(MeasurementUnit.none)
+  val DATABASE_CACHE_MISS = LogMarkerToken(database, "cacheMiss", 
count)(MeasurementUnit.none)
+  val DATABASE_SAVE =
+    LogMarkerToken(database, "saveDocument", 
start)(MeasurementUnit.time.milliseconds)
+  val DATABASE_BULK_SAVE =
+    LogMarkerToken(database, "saveDocumentBulk", 
start)(MeasurementUnit.time.milliseconds)
+  val DATABASE_DELETE =
+    LogMarkerToken(database, "deleteDocument", 
start)(MeasurementUnit.time.milliseconds)
+  val DATABASE_GET = LogMarkerToken(database, "getDocument", 
start)(MeasurementUnit.time.milliseconds)
+  val DATABASE_QUERY = LogMarkerToken(database, "queryView", 
start)(MeasurementUnit.time.milliseconds)
+  val DATABASE_ATT_GET =
+    LogMarkerToken(database, "getDocumentAttachment", 
start)(MeasurementUnit.time.milliseconds)
+  val DATABASE_ATT_SAVE =
+    LogMarkerToken(database, "saveDocumentAttachment", 
start)(MeasurementUnit.time.milliseconds)
+  val DATABASE_ATT_DELETE =
+    LogMarkerToken(database, "deleteDocumentAttachment", 
start)(MeasurementUnit.time.milliseconds)
+  val DATABASE_ATTS_DELETE =
+    LogMarkerToken(database, "deleteDocumentAttachments", 
start)(MeasurementUnit.time.milliseconds)
+  val DATABASE_BATCH_SIZE = LogMarkerToken(database, "batchSize", 
count)(MeasurementUnit.none)
 }
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/Prometheus.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/Prometheus.scala
new file mode 100644
index 0000000..2c97898
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Prometheus.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common
+import java.nio.charset.StandardCharsets.UTF_8
+
+import akka.http.scaladsl.model.{ContentType, HttpEntity}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.Route
+import kamon.Kamon
+import kamon.prometheus.PrometheusReporter
+
+class KamonPrometheus extends AutoCloseable {
+  private val reporter = new PrometheusReporter
+  private val v4 = ContentType.parse("text/plain; version=0.0.4; 
charset=utf-8").right.get
+  private val ref = Kamon.addReporter(reporter)
+
+  def route: Route = path("metrics") {
+    get {
+      encodeResponse {
+        complete(getReport())
+      }
+    }
+  }
+
+  private def getReport() = HttpEntity(v4, 
reporter.scrapeData().getBytes(UTF_8))
+
+  override def close(): Unit = ref.cancel()
+}
+
+object MetricsRoute {
+  private val impl =
+    if (TransactionId.metricsKamon && 
TransactionId.metricConfig.prometheusEnabled) Some(new KamonPrometheus)
+    else None
+
+  def apply(): Route = impl.map(_.route).getOrElse(reject)
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
index 9242ecb..dd320fd 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
@@ -198,12 +198,17 @@ case class StartMarker(val start: Instant, startMarker: 
LogMarkerToken)
  */
 protected case class TransactionMetadata(val id: String, val start: Instant, 
val extraLogging: Boolean = false)
 
+case class MetricConfig(prometheusEnabled: Boolean,
+                        kamonEnabled: Boolean,
+                        kamonTagsEnabled: Boolean,
+                        logsEnabled: Boolean)
 object TransactionId {
+  val metricConfig = loadConfigOrThrow[MetricConfig](ConfigKeys.metrics)
 
   // get the metric parameters directly from the environment since WhiskConfig 
can not be instantiated here
-  val metricsKamon: Boolean = 
sys.env.get("METRICS_KAMON").getOrElse("False").toBoolean
-  val metricsKamonTags: Boolean = 
sys.env.get("METRICS_KAMON_TAGS").getOrElse("False").toBoolean
-  val metricsLog: Boolean = 
sys.env.get("METRICS_LOG").getOrElse("True").toBoolean
+  val metricsKamon: Boolean = metricConfig.kamonEnabled
+  val metricsKamonTags: Boolean = metricConfig.kamonTagsEnabled
+  val metricsLog: Boolean = metricConfig.logsEnabled
 
   val generatorConfig = 
loadConfigOrThrow[TransactionGeneratorConfig](ConfigKeys.transactions)
 
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 32f5a90..dcc9962 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -247,4 +247,5 @@ object ConfigKeys {
   val activationStore = "whisk.activation-store"
   val activationStoreWithFileStorage = s"$activationStore.with-file-storage"
 
+  val metrics = "whisk.metrics"
 }
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index d65761b..a20795b 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -27,6 +27,7 @@ import akka.stream.scaladsl.{Sink, Source, StreamConverters}
 import akka.util.{ByteString, ByteStringBuilder}
 import com.microsoft.azure.cosmosdb._
 import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
+import kamon.metric.MeasurementUnit
 import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue, 
RootJsonFormat, _}
 import org.apache.openwhisk.common.{LogMarkerToken, Logging, LoggingMarkers, 
MetricEmitter, TransactionId}
 import org.apache.openwhisk.core.database.StoreUtils.{checkDocHasRevision, 
deserialize, reportFailure}
@@ -527,7 +528,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
   private def createToken(action: String, read: Boolean = true): 
LogMarkerToken = {
     val mode = if (read) "read" else "write"
     val tags = Map("action" -> action, "mode" -> mode, "collection" -> 
collName)
-    if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "ru", 
"used", tags = tags)
-    else LogMarkerToken("cosmosdb", "ru", collName, Some(action))
+    if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "ru", 
"used", tags = tags)(MeasurementUnit.none)
+    else LogMarkerToken("cosmosdb", "ru", collName, 
Some(action))(MeasurementUnit.none)
   }
 }
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala
index 352be10..3e71fc8 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala
@@ -27,6 +27,7 @@ import akka.http.scaladsl.server.RouteResult.Rejected
 import akka.http.scaladsl.server._
 import akka.http.scaladsl.server.directives._
 import akka.stream.ActorMaterializer
+import kamon.metric.MeasurementUnit
 import spray.json._
 import org.apache.openwhisk.common.Https.HttpsConfig
 import org.apache.openwhisk.common._
@@ -145,7 +146,7 @@ trait BasicHttpService extends Directives {
           m.toLowerCase,
           LoggingMarkers.count,
           Some(res.status.intValue.toString),
-          Map("statusCode" -> res.status.intValue.toString))
+          Map("statusCode" -> 
res.status.intValue.toString))(MeasurementUnit.time.milliseconds)
       val marker = LogMarker(token, tid.deltaToStart, Some(tid.deltaToStart))
 
       MetricEmitter.emitHistogramMetric(token, tid.deltaToStart)
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala
index 2ce26d0..7232453 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala
@@ -18,9 +18,7 @@
 package org.apache.openwhisk.http
 
 import akka.event.Logging
-
-import org.apache.openwhisk.common.Logging
-import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.common.{MetricsRoute, TransactionId}
 
 /**
  * This trait extends the BasicHttpService with a standard "ping" endpoint 
which
@@ -28,7 +26,7 @@ import org.apache.openwhisk.common.TransactionId
  */
 trait BasicRasService extends BasicHttpService {
 
-  override def routes(implicit transid: TransactionId) = ping
+  override def routes(implicit transid: TransactionId) = ping ~ MetricsRoute()
 
   override def loglevelForRoute(route: String): Logging.LogLevel = {
     if (route == "/ping") {
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/common/PrometheusTests.scala 
b/tests/src/test/scala/org/apache/openwhisk/common/PrometheusTests.scala
new file mode 100644
index 0000000..710d361
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/common/PrometheusTests.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common
+import akka.http.scaladsl.coding.Gzip
+import akka.http.scaladsl.model.{HttpCharsets, HttpResponse}
+import akka.http.scaladsl.model.headers.HttpEncodings.gzip
+import akka.http.scaladsl.model.headers.{`Accept-Encoding`, 
`Content-Encoding`, HttpEncoding, HttpEncodings}
+import akka.http.scaladsl.testkit.ScalatestRouteTest
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.matchers.Matcher
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class PrometheusTests extends FlatSpec with Matchers with ScalatestRouteTest 
with BeforeAndAfterAll with ScalaFutures {
+  behavior of "Prometheus"
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    //Modify Kamon to have a very small tick interval
+    val newConfig = ConfigFactory.parseString("""kamon {
+      |  metric {
+      |    tick-interval = 50 ms
+      |    optimistic-tick-alignment = no
+      |  }
+      |}""".stripMargin).withFallback(ConfigFactory.load())
+    Kamon.reconfigure(newConfig)
+  }
+
+  override protected def afterAll(): Unit = {
+    super.afterAll()
+    Kamon.reconfigure(ConfigFactory.load())
+  }
+
+  it should "respond to /metrics" in {
+    val api = new KamonPrometheus
+    Kamon.counter("foo_bar").increment(42)
+
+    //Sleep to ensure that Kamon metrics are pushed to reporters
+    Thread.sleep(2.seconds.toMillis)
+    Get("/metrics") ~> `Accept-Encoding`(gzip) ~> api.route ~> check {
+      // Check that response confirms to what Prometheus scrapper accepts
+      contentType.charsetOption shouldBe Some(HttpCharsets.`UTF-8`)
+      contentType.mediaType.params("version") shouldBe "0.0.4"
+      response should haveContentEncoding(gzip)
+
+      val responseText = 
Unmarshal(Gzip.decodeMessage(response)).to[String].futureValue
+      withClue(responseText) {
+        responseText should include("foo_bar")
+      }
+    }
+    api.close()
+  }
+
+  it should "not be enabled by default" in {
+    Get("/metrics") ~> MetricsRoute() ~> check {
+      handled shouldBe false
+    }
+  }
+
+  private def haveContentEncoding(encoding: HttpEncoding): 
Matcher[HttpResponse] =
+    be(encoding) compose {
+      (_: 
HttpResponse).header[`Content-Encoding`].map(_.encodings.head).getOrElse(HttpEncodings.identity)
+    }
+}

Reply via email to