This is an automated email from the ASF dual-hosted git repository.
vvraskin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new b50254d Update to Kamon 1.1.3 from 0.6 series (#4165)
b50254d is described below
commit b50254d3ccb83e5960e88b5b8521396ca9c54dd1
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Wed Jan 9 19:03:25 2019 +0530
Update to Kamon 1.1.3 from 0.6 series (#4165)
This change is a stepping stone for metrics reporting via Prometheus. Kamon
Prometheus support was added with Kamon 1.0 series.
---
common/scala/build.gradle | 4 +-
common/scala/src/main/resources/application.conf | 19 +++++-----
.../org/apache/openwhisk/common/Logging.scala | 44 +++++++++++++++++-----
.../openwhisk/core/controller/Controller.scala | 8 ++--
.../apache/openwhisk/core/invoker/Invoker.scala | 12 +++---
5 files changed, 55 insertions(+), 32 deletions(-)
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index d7b8497..0a040b7 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -64,8 +64,8 @@ dependencies {
compile 'com.github.ben-manes.caffeine:caffeine:2.6.2'
compile 'com.google.code.findbugs:jsr305:3.0.2'
compile 'io.fabric8:kubernetes-client:4.0.3'
- compile 'io.kamon:kamon-core_2.12:0.6.7'
- compile 'io.kamon:kamon-statsd_2.12:0.6.7'
+ compile 'io.kamon:kamon-core_2.12:1.1.3'
+ compile 'io.kamon:kamon-statsd_2.12:1.0.0'
//for mesos
compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.17'
diff --git a/common/scala/src/main/resources/application.conf
b/common/scala/src/main/resources/application.conf
index 40c98a7..40049f8 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -19,7 +19,11 @@ akka.http {
#kamon related configuration
kamon {
-
+ environment {
+ # Identifier for this service. For keeping it backward compatible
setting to natch previous
+ # statsd name
+ service = "openwhisk-statsd"
+ }
metric {
tick-interval = 1 second
}
@@ -40,17 +44,12 @@ kamon {
dispatcher = [ "*" ]
}
- simple-metric-key-generator {
- # Application prefix for all metrics pushed to StatsD. The default
namespacing scheme for metrics follows
- # this pattern:
- # application.host.entity.entity-name.metric-name
- application = "openwhisk-statsd"
- }
+ metric-key-generator =
org.apache.openwhisk.common.WhiskStatsDMetricKeyGenerator
}
- modules {
- kamon-statsd.auto-start = yes
- }
+ reporters = [
+ "kamon.statsd.StatsDReporter"
+ ]
}
whisk {
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 3bc3597..b7cff4e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -20,9 +20,11 @@ package org.apache.openwhisk.common
import java.io.PrintStream
import java.time.{Clock, Instant, ZoneId}
import java.time.format.DateTimeFormatter
+
import akka.event.Logging._
import akka.event.LoggingAdapter
import kamon.Kamon
+import kamon.statsd.{MetricKeyGenerator, SimpleMetricKeyGenerator}
import org.apache.openwhisk.core.entity.ControllerInstanceId
trait Logging {
@@ -210,17 +212,15 @@ object LogMarkerToken {
}
object MetricEmitter {
-
- val metrics = Kamon.metrics
-
def emitCounterMetric(token: LogMarkerToken, times: Long = 1): Unit = {
if (TransactionId.metricsKamon) {
if (TransactionId.metricsKamonTags) {
- metrics
- .counter(token.toString, token.tags)
+ Kamon
+ .counter(createName(token.toString, "counter"))
+ .refine(token.tags)
.increment(times)
} else {
- metrics.counter(token.toStringWithSubAction).increment(times)
+ Kamon.counter(createName(token.toStringWithSubAction,
"counter")).increment(times)
}
}
}
@@ -228,14 +228,40 @@ object MetricEmitter {
def emitHistogramMetric(token: LogMarkerToken, value: Long): Unit = {
if (TransactionId.metricsKamon) {
if (TransactionId.metricsKamonTags) {
- metrics
- .histogram(token.toString, token.tags)
+ Kamon
+ .histogram(createName(token.toString, "histogram"))
+ .refine(token.tags)
.record(value)
} else {
- metrics.histogram(token.toStringWithSubAction).record(value)
+ Kamon.histogram(createName(token.toStringWithSubAction,
"histogram")).record(value)
}
}
}
+
+ /**
+ * Kamon 1.0 onwards does not include the metric type in the metric name
which cause issue
+ * for us as we use same metric name for counter and histogram. So to be
backward compatible we
+ * need to prefix the name with type
+ */
+ private def createName(name: String, metricType: String) = {
+ s"$metricType.$name"
+ }
+}
+
+/**
+ * Name generator to make names compatible to pre Kamon 1.0 logic. Statsd
reporter "normalizes"
+ * the key name by replacing all `.` with `_`. Pre 1.0 the metric category was
added by Statsd
+ * reporter itself. However now we pass it explicitly. So to retain the pre
1.0 name we need to replace
+ * normalized name with one having category followed by `.` instead of `_`
+ */
+class WhiskStatsDMetricKeyGenerator(config: com.typesafe.config.Config)
extends MetricKeyGenerator {
+ val simpleGen = new SimpleMetricKeyGenerator(config)
+ override def generateKey(name: String, tags: Map[String, String]): String = {
+ val key = simpleGen.generateKey(name, tags)
+ if (key.contains(".counter_")) key.replace(".counter_", ".counter.")
+ else if (key.contains(".histogram_")) key.replace(".histogram_",
".histogram.")
+ else key
+ }
}
object LoggingMarkers {
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
index 1cb38ee..5136a66 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
@@ -42,8 +42,9 @@ import org.apache.openwhisk.core.loadBalancer.{InvokerState,
LoadBalancerProvide
import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
import org.apache.openwhisk.spi.SpiLoader
+import scala.concurrent.ExecutionContext.Implicits
import scala.concurrent.duration.DurationInt
-import scala.concurrent.{Await, Future}
+import scala.concurrent.Await
import scala.util.{Failure, Success}
/**
@@ -206,15 +207,14 @@ object Controller {
"runtimes" -> runtimes.toJson)
def main(args: Array[String]): Unit = {
- Kamon.start()
+ Kamon.loadReportersFromConfig()
implicit val actorSystem = ActorSystem("controller-actor-system")
implicit val logger = new
AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
// Prepare Kamon shutdown
CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate,
"shutdownKamon") { () =>
logger.info(this, s"Shutting down Kamon with coordinated shutdown")
- Kamon.shutdown()
- Future.successful(Done)
+ Kamon.stopAllReporters().map(_ => Done)(Implicits.global)
}
// extract configuration data from the environment
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 14ca32b..2f35afb 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -37,7 +37,7 @@ import org.apache.openwhisk.spi.SpiLoader
import org.apache.openwhisk.utils.ExecutionContextFactory
import scala.concurrent.duration._
-import scala.concurrent.{Await, Future}
+import scala.concurrent.Await
import scala.util.{Failure, Try}
case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] =
None, displayedName: Option[String] = None)
@@ -59,13 +59,12 @@ object Invoker {
def initKamon(instance: Int): Unit = {
// Replace the hostname of the invoker to the assigned id of the invoker.
val newKamonConfig = Kamon.config
- .withValue(
- "kamon.statsd.simple-metric-key-generator.hostname-override",
- ConfigValueFactory.fromAnyRef(s"invoker$instance"))
- Kamon.start(newKamonConfig)
+ .withValue("kamon.environment.host",
ConfigValueFactory.fromAnyRef(s"invoker$instance"))
+ Kamon.reconfigure(newKamonConfig)
}
def main(args: Array[String]): Unit = {
+ Kamon.loadReportersFromConfig()
implicit val ec =
ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
implicit val actorSystem: ActorSystem =
ActorSystem(name = "invoker-actor-system", defaultExecutionContext =
Some(ec))
@@ -75,8 +74,7 @@ object Invoker {
// Prepare Kamon shutdown
CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate,
"shutdownKamon") { () =>
logger.info(this, s"Shutting down Kamon with coordinated shutdown")
- Kamon.shutdown()
- Future.successful(Done)
+ Kamon.stopAllReporters().map(_ => Done)
}
// load values for the required properties from the environment