This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new a92d14c Enable publishing metrics to Prometheus. (#4227)
a92d14c is described below
commit a92d14c3cd685ec8c9caa530606a82bb9aed9405
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Tue Feb 12 07:38:35 2019 -0800
Enable publishing metrics to Prometheus. (#4227)
---
ansible/roles/nginx/templates/nginx.conf.j2 | 4 +
common/scala/build.gradle | 3 +
common/scala/src/main/resources/application.conf | 23 ++++
.../org/apache/openwhisk/common/Logging.scala | 152 ++++++++++++---------
.../org/apache/openwhisk/common/Prometheus.scala | 51 +++++++
.../apache/openwhisk/common/TransactionId.scala | 11 +-
.../org/apache/openwhisk/core/WhiskConfig.scala | 1 +
.../database/cosmosdb/CosmosDBArtifactStore.scala | 5 +-
.../apache/openwhisk/http/BasicHttpService.scala | 3 +-
.../apache/openwhisk/http/BasicRasService.scala | 6 +-
.../apache/openwhisk/common/PrometheusTests.scala | 86 ++++++++++++
11 files changed, 272 insertions(+), 73 deletions(-)
diff --git a/ansible/roles/nginx/templates/nginx.conf.j2
b/ansible/roles/nginx/templates/nginx.conf.j2
index cefea41..d05f779 100644
--- a/ansible/roles/nginx/templates/nginx.conf.j2
+++ b/ansible/roles/nginx/templates/nginx.conf.j2
@@ -148,6 +148,10 @@ http {
rewrite /cli/go/download(.*) /cli$1 permanent;
}
+ location /metrics {
+ deny all;
+ }
+
{% if nginx.htmldir %}
location /ui {
alias /usr/share/nginx/html;
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 642816a..7d14644 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -71,6 +71,9 @@ dependencies {
compile ('io.kamon:kamon-system-metrics_2.12:1.0.0') {
exclude group: 'io.kamon', module: 'sigar-loader'
}
+ compile ('io.kamon:kamon-prometheus_2.12:1.1.1'){
+ exclude group: 'org.nanohttpd'
+ }
//for mesos
compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.17'
diff --git a/common/scala/src/main/resources/application.conf
b/common/scala/src/main/resources/application.conf
index 857151e..cf8a3ee 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -50,6 +50,10 @@ kamon {
# disable the host metrics as we are only interested in JVM metrics
host.enabled = false
}
+ prometheus {
+ # We expose the metrics endpoint over akka http. So default server is
disabled
+ start-embedded-http-server = no
+ }
reporters = [
"kamon.statsd.StatsDReporter"
@@ -57,6 +61,25 @@ kamon {
}
whisk {
+ metrics {
+ # Enable/disable Prometheus support. If enabled then metrics would be
exposed at `/metrics` endpoint
+ # If Prometheus is enabled then please review
`kamon.metric.tick-interval` (set to 1 sec by default above).
+ # It can then be set to scrape interval value which is generally 60
secs
+ prometheus-enabled = false
+
+ # Enable/disable whether metric information is sent to the configured
reporters.
+ kamon-enabled = false
+ kamon-enabled = ${?METRICS_KAMON}
+
+ # Enable/disable whether to use the Kamon tags when sending metrics.
+ kamon-tags-enabled = false
+ kamon-tags-enabled = ${?METRICS_KAMON_TAGS}
+
+ # Enable/disable whether the metric information is written out to the
log files in logmarker format.
+ logs-enabled = true
+ logs-enabled = ${?METRICS_LOG}
+ }
+
# kafka related configuration, the complete list of parameters is here:
# https://kafka.apache.org/documentation/#brokerconfigs
kafka {
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 1eb0083..e5ff182 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -24,7 +24,7 @@ import java.time.format.DateTimeFormatter
import akka.event.Logging._
import akka.event.LoggingAdapter
import kamon.Kamon
-import kamon.metric.{Counter => KCounter, Histogram => KHistogram}
+import kamon.metric.{MeasurementUnit, Counter => KCounter, Histogram =>
KHistogram}
import kamon.statsd.{MetricKeyGenerator, SimpleMetricKeyGenerator}
import kamon.system.SystemMetrics
import org.apache.openwhisk.core.entity.ControllerInstanceId
@@ -181,11 +181,12 @@ private object Emitter {
* @param subAction more specific identifier for "action", like `runc.resume`
* @param tags tags can be used for whatever granularity you might need.
*/
-case class LogMarkerToken(component: String,
- action: String,
- state: String,
- subAction: Option[String] = None,
- tags: Map[String, String] = Map.empty) {
+case class LogMarkerToken(
+ component: String,
+ action: String,
+ state: String,
+ subAction: Option[String] = None,
+ tags: Map[String, String] = Map.empty)(measurementUnit: MeasurementUnit =
MeasurementUnit.none) {
private var finishToken: LogMarkerToken = _
private var errorToken: LogMarkerToken = _
@@ -200,14 +201,14 @@ case class LogMarkerToken(component: String,
def asFinish: LogMarkerToken = {
if (finishToken == null) {
- finishToken = copy(state = LoggingMarkers.finish)
+ finishToken = copy(state = LoggingMarkers.finish)(measurementUnit)
}
finishToken
}
def asError: LogMarkerToken = {
if (errorToken == null) {
- errorToken = copy(state = LoggingMarkers.error)
+ errorToken = copy(state = LoggingMarkers.error)(measurementUnit)
}
errorToken
}
@@ -239,10 +240,10 @@ case class LogMarkerToken(component: String,
private def createHistogram() = {
if (TransactionId.metricsKamonTags) {
Kamon
- .histogram(createName(toString, "histogram"))
+ .histogram(createName(toString, "histogram"), measurementUnit)
.refine(tags)
} else {
- Kamon.histogram(createName(toStringWithSubAction, "histogram"))
+ Kamon.histogram(createName(toStringWithSubAction, "histogram"),
measurementUnit)
}
}
@@ -269,7 +270,7 @@ object LogMarkerToken {
case a :: s :: _ => (a, Some(s))
}
- LogMarkerToken(component, generalAction, state, subAction)
+ LogMarkerToken(component, generalAction, state,
subAction)(MeasurementUnit.none)
}
}
@@ -327,93 +328,118 @@ object LoggingMarkers {
/*
* Controller related markers
*/
- def CONTROLLER_STARTUP(id: String) = LogMarkerToken(controller,
s"startup$id", count)
+ def CONTROLLER_STARTUP(id: String) = LogMarkerToken(controller,
s"startup$id", count)(MeasurementUnit.none)
// Time of the activation in controller until it is delivered to Kafka
- val CONTROLLER_ACTIVATION = LogMarkerToken(controller, activation, start)
- val CONTROLLER_ACTIVATION_BLOCKING = LogMarkerToken(controller,
"blockingActivation", start)
+ val CONTROLLER_ACTIVATION =
+ LogMarkerToken(controller, activation,
start)(MeasurementUnit.time.milliseconds)
+ val CONTROLLER_ACTIVATION_BLOCKING =
+ LogMarkerToken(controller, "blockingActivation",
start)(MeasurementUnit.time.milliseconds)
val CONTROLLER_ACTIVATION_BLOCKING_DATABASE_RETRIEVAL =
- LogMarkerToken(controller, "blockingActivationDatabaseRetrieval", count)
+ LogMarkerToken(controller, "blockingActivationDatabaseRetrieval",
count)(MeasurementUnit.none)
// Time that is needed to load balance the activation
- val CONTROLLER_LOADBALANCER = LogMarkerToken(controller, loadbalancer, start)
+ val CONTROLLER_LOADBALANCER = LogMarkerToken(controller, loadbalancer,
start)(MeasurementUnit.none)
// Time that is needed to produce message in kafka
- val CONTROLLER_KAFKA = LogMarkerToken(controller, kafka, start)
+ val CONTROLLER_KAFKA = LogMarkerToken(controller, kafka,
start)(MeasurementUnit.time.milliseconds)
// System overload and random invoker assignment
- val MANAGED_SYSTEM_OVERLOAD = LogMarkerToken(controller,
"managedInvokerSystemOverload", count)
- val BLACKBOX_SYSTEM_OVERLOAD = LogMarkerToken(controller,
"blackBoxInvokerSystemOverload", count)
+ val MANAGED_SYSTEM_OVERLOAD = LogMarkerToken(controller,
"managedInvokerSystemOverload", count)(MeasurementUnit.none)
+ val BLACKBOX_SYSTEM_OVERLOAD =
+ LogMarkerToken(controller, "blackBoxInvokerSystemOverload",
count)(MeasurementUnit.none)
/*
* Invoker related markers
*/
- def INVOKER_STARTUP(i: Int) = LogMarkerToken(invoker, s"startup$i", count)
+ def INVOKER_STARTUP(i: Int) = LogMarkerToken(invoker, s"startup$i",
count)(MeasurementUnit.none)
// Check invoker healthy state from loadbalancer
def LOADBALANCER_INVOKER_STATUS_CHANGE(state: String) =
- LogMarkerToken(loadbalancer, "invokerState", count, Some(state))
- val LOADBALANCER_ACTIVATION_START = LogMarkerToken(loadbalancer,
"activations", count)
+ LogMarkerToken(loadbalancer, "invokerState", count,
Some(state))(MeasurementUnit.none)
+ val LOADBALANCER_ACTIVATION_START = LogMarkerToken(loadbalancer,
"activations", count)(MeasurementUnit.none)
def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance:
ControllerInstanceId) =
- LogMarkerToken(loadbalancer + controllerInstance.asString,
"activationsInflight", count)
+ LogMarkerToken(loadbalancer + controllerInstance.asString,
"activationsInflight", count)(MeasurementUnit.none)
def LOADBALANCER_MEMORY_INFLIGHT(controllerInstance: ControllerInstanceId,
actionType: String) =
- LogMarkerToken(loadbalancer + controllerInstance.asString,
s"memory${actionType}Inflight", count)
+ LogMarkerToken(loadbalancer + controllerInstance.asString,
s"memory${actionType}Inflight", count)(
+ MeasurementUnit.none)
// Time that is needed to execute the action
- val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start)
+ val INVOKER_ACTIVATION_RUN =
+ LogMarkerToken(invoker, "activationRun",
start)(MeasurementUnit.time.milliseconds)
// Time that is needed to init the action
- val INVOKER_ACTIVATION_INIT = LogMarkerToken(invoker, "activationInit",
start)
+ val INVOKER_ACTIVATION_INIT =
+ LogMarkerToken(invoker, "activationInit",
start)(MeasurementUnit.time.milliseconds)
// Time needed to collect the logs
- val INVOKER_COLLECT_LOGS = LogMarkerToken(invoker, "collectLogs", start)
+ val INVOKER_COLLECT_LOGS =
+ LogMarkerToken(invoker, "collectLogs",
start)(MeasurementUnit.time.milliseconds)
// Time in invoker
- val INVOKER_ACTIVATION = LogMarkerToken(invoker, activation, start)
- def INVOKER_DOCKER_CMD(cmd: String) = LogMarkerToken(invoker, "docker",
start, Some(cmd), Map("cmd" -> cmd))
+ val INVOKER_ACTIVATION = LogMarkerToken(invoker, activation,
start)(MeasurementUnit.none)
+ def INVOKER_DOCKER_CMD(cmd: String) =
+ LogMarkerToken(invoker, "docker", start, Some(cmd), Map("cmd" ->
cmd))(MeasurementUnit.time.milliseconds)
def INVOKER_DOCKER_CMD_TIMEOUT(cmd: String) =
- LogMarkerToken(invoker, "docker", timeout, Some(cmd), Map("cmd" -> cmd))
- def INVOKER_RUNC_CMD(cmd: String) = LogMarkerToken(invoker, "runc", start,
Some(cmd), Map("cmd" -> cmd))
- def INVOKER_KUBECTL_CMD(cmd: String) = LogMarkerToken(invoker, "kubectl",
start, Some(cmd), Map("cmd" -> cmd))
+ LogMarkerToken(invoker, "docker", timeout, Some(cmd), Map("cmd" ->
cmd))(MeasurementUnit.none)
+ def INVOKER_RUNC_CMD(cmd: String) =
+ LogMarkerToken(invoker, "runc", start, Some(cmd), Map("cmd" ->
cmd))(MeasurementUnit.time.milliseconds)
+ def INVOKER_KUBECTL_CMD(cmd: String) =
+ LogMarkerToken(invoker, "kubectl", start, Some(cmd), Map("cmd" ->
cmd))(MeasurementUnit.none)
def INVOKER_MESOS_CMD(cmd: String) =
- LogMarkerToken(invoker, "mesos", start, Some(cmd), Map("cmd" -> cmd))
+ LogMarkerToken(invoker, "mesos", start, Some(cmd), Map("cmd" ->
cmd))(MeasurementUnit.time.milliseconds)
def INVOKER_MESOS_CMD_TIMEOUT(cmd: String) =
- LogMarkerToken(invoker, "mesos", timeout, Some(cmd), Map("cmd" -> cmd))
+ LogMarkerToken(invoker, "mesos", timeout, Some(cmd), Map("cmd" ->
cmd))(MeasurementUnit.none)
def INVOKER_CONTAINER_START(containerState: String) =
- LogMarkerToken(invoker, "containerStart", count, Some(containerState),
Map("containerState" -> containerState))
+ LogMarkerToken(invoker, "containerStart", count, Some(containerState),
Map("containerState" -> containerState))(
+ MeasurementUnit.none)
val CONTAINER_CLIENT_RETRIES =
- LogMarkerToken(containerClient, "retries", count)
-
- val INVOKER_TOTALMEM_BLACKBOX = LogMarkerToken(loadbalancer,
"totalCapacityBlackBox", count)
- val INVOKER_TOTALMEM_MANAGED = LogMarkerToken(loadbalancer,
"totalCapacityManaged", count)
-
- val HEALTHY_INVOKER_MANAGED = LogMarkerToken(loadbalancer,
"totalHealthyInvokerManaged", count)
- val UNHEALTHY_INVOKER_MANAGED = LogMarkerToken(loadbalancer,
"totalUnhealthyInvokerManaged", count)
- val UNRESPONSIVE_INVOKER_MANAGED = LogMarkerToken(loadbalancer,
"totalUnresponsiveInvokerManaged", count)
- val OFFLINE_INVOKER_MANAGED = LogMarkerToken(loadbalancer,
"totalOfflineInvokerManaged", count)
-
- val HEALTHY_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer,
"totalHealthyInvokerBlackBox", count)
- val UNHEALTHY_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer,
"totalUnhealthyInvokerBlackBox", count)
- val UNRESPONSIVE_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer,
"totalUnresponsiveInvokerBlackBox", count)
- val OFFLINE_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer,
"totalOfflineInvokerBlackBox", count)
+ LogMarkerToken(containerClient, "retries", count)(MeasurementUnit.none)
+
+ val INVOKER_TOTALMEM_BLACKBOX = LogMarkerToken(loadbalancer,
"totalCapacityBlackBox", count)(MeasurementUnit.none)
+ val INVOKER_TOTALMEM_MANAGED = LogMarkerToken(loadbalancer,
"totalCapacityManaged", count)(MeasurementUnit.none)
+
+ val HEALTHY_INVOKER_MANAGED = LogMarkerToken(loadbalancer,
"totalHealthyInvokerManaged", count)(MeasurementUnit.none)
+ val UNHEALTHY_INVOKER_MANAGED =
+ LogMarkerToken(loadbalancer, "totalUnhealthyInvokerManaged",
count)(MeasurementUnit.none)
+ val UNRESPONSIVE_INVOKER_MANAGED =
+ LogMarkerToken(loadbalancer, "totalUnresponsiveInvokerManaged",
count)(MeasurementUnit.none)
+ val OFFLINE_INVOKER_MANAGED = LogMarkerToken(loadbalancer,
"totalOfflineInvokerManaged", count)(MeasurementUnit.none)
+
+ val HEALTHY_INVOKER_BLACKBOX =
+ LogMarkerToken(loadbalancer, "totalHealthyInvokerBlackBox",
count)(MeasurementUnit.none)
+ val UNHEALTHY_INVOKER_BLACKBOX =
+ LogMarkerToken(loadbalancer, "totalUnhealthyInvokerBlackBox",
count)(MeasurementUnit.none)
+ val UNRESPONSIVE_INVOKER_BLACKBOX =
+ LogMarkerToken(loadbalancer, "totalUnresponsiveInvokerBlackBox",
count)(MeasurementUnit.none)
+ val OFFLINE_INVOKER_BLACKBOX =
+ LogMarkerToken(loadbalancer, "totalOfflineInvokerBlackBox",
count)(MeasurementUnit.none)
// Kafka related markers
- def KAFKA_QUEUE(topic: String) = LogMarkerToken(kafka, topic, count)
- def KAFKA_MESSAGE_DELAY(topic: String) = LogMarkerToken(kafka, topic, start,
Some("delay"))
+ def KAFKA_QUEUE(topic: String) = LogMarkerToken(kafka, topic,
count)(MeasurementUnit.none)
+ def KAFKA_MESSAGE_DELAY(topic: String) =
+ LogMarkerToken(kafka, topic, start,
Some("delay"))(MeasurementUnit.time.milliseconds)
/*
* General markers
*/
- val DATABASE_CACHE_HIT = LogMarkerToken(database, "cacheHit", count)
- val DATABASE_CACHE_MISS = LogMarkerToken(database, "cacheMiss", count)
- val DATABASE_SAVE = LogMarkerToken(database, "saveDocument", start)
- val DATABASE_BULK_SAVE = LogMarkerToken(database, "saveDocumentBulk", start)
- val DATABASE_DELETE = LogMarkerToken(database, "deleteDocument", start)
- val DATABASE_GET = LogMarkerToken(database, "getDocument", start)
- val DATABASE_QUERY = LogMarkerToken(database, "queryView", start)
- val DATABASE_ATT_GET = LogMarkerToken(database, "getDocumentAttachment",
start)
- val DATABASE_ATT_SAVE = LogMarkerToken(database, "saveDocumentAttachment",
start)
- val DATABASE_ATT_DELETE = LogMarkerToken(database,
"deleteDocumentAttachment", start)
- val DATABASE_ATTS_DELETE = LogMarkerToken(database,
"deleteDocumentAttachments", start)
- val DATABASE_BATCH_SIZE = LogMarkerToken(database, "batchSize", count)
+ val DATABASE_CACHE_HIT = LogMarkerToken(database, "cacheHit",
count)(MeasurementUnit.none)
+ val DATABASE_CACHE_MISS = LogMarkerToken(database, "cacheMiss",
count)(MeasurementUnit.none)
+ val DATABASE_SAVE =
+ LogMarkerToken(database, "saveDocument",
start)(MeasurementUnit.time.milliseconds)
+ val DATABASE_BULK_SAVE =
+ LogMarkerToken(database, "saveDocumentBulk",
start)(MeasurementUnit.time.milliseconds)
+ val DATABASE_DELETE =
+ LogMarkerToken(database, "deleteDocument",
start)(MeasurementUnit.time.milliseconds)
+ val DATABASE_GET = LogMarkerToken(database, "getDocument",
start)(MeasurementUnit.time.milliseconds)
+ val DATABASE_QUERY = LogMarkerToken(database, "queryView",
start)(MeasurementUnit.time.milliseconds)
+ val DATABASE_ATT_GET =
+ LogMarkerToken(database, "getDocumentAttachment",
start)(MeasurementUnit.time.milliseconds)
+ val DATABASE_ATT_SAVE =
+ LogMarkerToken(database, "saveDocumentAttachment",
start)(MeasurementUnit.time.milliseconds)
+ val DATABASE_ATT_DELETE =
+ LogMarkerToken(database, "deleteDocumentAttachment",
start)(MeasurementUnit.time.milliseconds)
+ val DATABASE_ATTS_DELETE =
+ LogMarkerToken(database, "deleteDocumentAttachments",
start)(MeasurementUnit.time.milliseconds)
+ val DATABASE_BATCH_SIZE = LogMarkerToken(database, "batchSize",
count)(MeasurementUnit.none)
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/Prometheus.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/Prometheus.scala
new file mode 100644
index 0000000..2c97898
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Prometheus.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common
+import java.nio.charset.StandardCharsets.UTF_8
+
+import akka.http.scaladsl.model.{ContentType, HttpEntity}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.Route
+import kamon.Kamon
+import kamon.prometheus.PrometheusReporter
+
+class KamonPrometheus extends AutoCloseable {
+ private val reporter = new PrometheusReporter
+ private val v4 = ContentType.parse("text/plain; version=0.0.4;
charset=utf-8").right.get
+ private val ref = Kamon.addReporter(reporter)
+
+ def route: Route = path("metrics") {
+ get {
+ encodeResponse {
+ complete(getReport())
+ }
+ }
+ }
+
+ private def getReport() = HttpEntity(v4,
reporter.scrapeData().getBytes(UTF_8))
+
+ override def close(): Unit = ref.cancel()
+}
+
+object MetricsRoute {
+ private val impl =
+ if (TransactionId.metricsKamon &&
TransactionId.metricConfig.prometheusEnabled) Some(new KamonPrometheus)
+ else None
+
+ def apply(): Route = impl.map(_.route).getOrElse(reject)
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
index 9242ecb..dd320fd 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
@@ -198,12 +198,17 @@ case class StartMarker(val start: Instant, startMarker:
LogMarkerToken)
*/
protected case class TransactionMetadata(val id: String, val start: Instant,
val extraLogging: Boolean = false)
+case class MetricConfig(prometheusEnabled: Boolean,
+ kamonEnabled: Boolean,
+ kamonTagsEnabled: Boolean,
+ logsEnabled: Boolean)
object TransactionId {
+ val metricConfig = loadConfigOrThrow[MetricConfig](ConfigKeys.metrics)
// get the metric parameters directly from the environment since WhiskConfig
can not be instantiated here
- val metricsKamon: Boolean =
sys.env.get("METRICS_KAMON").getOrElse("False").toBoolean
- val metricsKamonTags: Boolean =
sys.env.get("METRICS_KAMON_TAGS").getOrElse("False").toBoolean
- val metricsLog: Boolean =
sys.env.get("METRICS_LOG").getOrElse("True").toBoolean
+ val metricsKamon: Boolean = metricConfig.kamonEnabled
+ val metricsKamonTags: Boolean = metricConfig.kamonTagsEnabled
+ val metricsLog: Boolean = metricConfig.logsEnabled
val generatorConfig =
loadConfigOrThrow[TransactionGeneratorConfig](ConfigKeys.transactions)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 32f5a90..dcc9962 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -247,4 +247,5 @@ object ConfigKeys {
val activationStore = "whisk.activation-store"
val activationStoreWithFileStorage = s"$activationStore.with-file-storage"
+ val metrics = "whisk.metrics"
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index d65761b..a20795b 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -27,6 +27,7 @@ import akka.stream.scaladsl.{Sink, Source, StreamConverters}
import akka.util.{ByteString, ByteStringBuilder}
import com.microsoft.azure.cosmosdb._
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
+import kamon.metric.MeasurementUnit
import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue,
RootJsonFormat, _}
import org.apache.openwhisk.common.{LogMarkerToken, Logging, LoggingMarkers,
MetricEmitter, TransactionId}
import org.apache.openwhisk.core.database.StoreUtils.{checkDocHasRevision,
deserialize, reportFailure}
@@ -527,7 +528,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
private def createToken(action: String, read: Boolean = true):
LogMarkerToken = {
val mode = if (read) "read" else "write"
val tags = Map("action" -> action, "mode" -> mode, "collection" ->
collName)
- if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "ru",
"used", tags = tags)
- else LogMarkerToken("cosmosdb", "ru", collName, Some(action))
+ if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "ru",
"used", tags = tags)(MeasurementUnit.none)
+ else LogMarkerToken("cosmosdb", "ru", collName,
Some(action))(MeasurementUnit.none)
}
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala
b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala
index 352be10..3e71fc8 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala
@@ -27,6 +27,7 @@ import akka.http.scaladsl.server.RouteResult.Rejected
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.directives._
import akka.stream.ActorMaterializer
+import kamon.metric.MeasurementUnit
import spray.json._
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common._
@@ -145,7 +146,7 @@ trait BasicHttpService extends Directives {
m.toLowerCase,
LoggingMarkers.count,
Some(res.status.intValue.toString),
- Map("statusCode" -> res.status.intValue.toString))
+ Map("statusCode" ->
res.status.intValue.toString))(MeasurementUnit.time.milliseconds)
val marker = LogMarker(token, tid.deltaToStart, Some(tid.deltaToStart))
MetricEmitter.emitHistogramMetric(token, tid.deltaToStart)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala
b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala
index 2ce26d0..7232453 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala
@@ -18,9 +18,7 @@
package org.apache.openwhisk.http
import akka.event.Logging
-
-import org.apache.openwhisk.common.Logging
-import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.common.{MetricsRoute, TransactionId}
/**
* This trait extends the BasicHttpService with a standard "ping" endpoint
which
@@ -28,7 +26,7 @@ import org.apache.openwhisk.common.TransactionId
*/
trait BasicRasService extends BasicHttpService {
- override def routes(implicit transid: TransactionId) = ping
+ override def routes(implicit transid: TransactionId) = ping ~ MetricsRoute()
override def loglevelForRoute(route: String): Logging.LogLevel = {
if (route == "/ping") {
diff --git
a/tests/src/test/scala/org/apache/openwhisk/common/PrometheusTests.scala
b/tests/src/test/scala/org/apache/openwhisk/common/PrometheusTests.scala
new file mode 100644
index 0000000..710d361
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/common/PrometheusTests.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common
+import akka.http.scaladsl.coding.Gzip
+import akka.http.scaladsl.model.{HttpCharsets, HttpResponse}
+import akka.http.scaladsl.model.headers.HttpEncodings.gzip
+import akka.http.scaladsl.model.headers.{`Accept-Encoding`,
`Content-Encoding`, HttpEncoding, HttpEncodings}
+import akka.http.scaladsl.testkit.ScalatestRouteTest
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.matchers.Matcher
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class PrometheusTests extends FlatSpec with Matchers with ScalatestRouteTest
with BeforeAndAfterAll with ScalaFutures {
+ behavior of "Prometheus"
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ //Modify Kamon to have a very small tick interval
+ val newConfig = ConfigFactory.parseString("""kamon {
+ | metric {
+ | tick-interval = 50 ms
+ | optimistic-tick-alignment = no
+ | }
+ |}""".stripMargin).withFallback(ConfigFactory.load())
+ Kamon.reconfigure(newConfig)
+ }
+
+ override protected def afterAll(): Unit = {
+ super.afterAll()
+ Kamon.reconfigure(ConfigFactory.load())
+ }
+
+ it should "respond to /metrics" in {
+ val api = new KamonPrometheus
+ Kamon.counter("foo_bar").increment(42)
+
+ //Sleep to ensure that Kamon metrics are pushed to reporters
+ Thread.sleep(2.seconds.toMillis)
+ Get("/metrics") ~> `Accept-Encoding`(gzip) ~> api.route ~> check {
+ // Check that response confirms to what Prometheus scrapper accepts
+ contentType.charsetOption shouldBe Some(HttpCharsets.`UTF-8`)
+ contentType.mediaType.params("version") shouldBe "0.0.4"
+ response should haveContentEncoding(gzip)
+
+ val responseText =
Unmarshal(Gzip.decodeMessage(response)).to[String].futureValue
+ withClue(responseText) {
+ responseText should include("foo_bar")
+ }
+ }
+ api.close()
+ }
+
+ it should "not be enabled by default" in {
+ Get("/metrics") ~> MetricsRoute() ~> check {
+ handled shouldBe false
+ }
+ }
+
+ private def haveContentEncoding(encoding: HttpEncoding):
Matcher[HttpResponse] =
+ be(encoding) compose {
+ (_:
HttpResponse).header[`Content-Encoding`].map(_.encodings.head).getOrElse(HttpEncodings.identity)
+ }
+}