This is an automated email from the ASF dual-hosted git repository.

vvraskin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new b50254d  Update to Kamon 1.1.3 from 0.6 series (#4165)
b50254d is described below

commit b50254d3ccb83e5960e88b5b8521396ca9c54dd1
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Wed Jan 9 19:03:25 2019 +0530

    Update to Kamon 1.1.3 from 0.6 series (#4165)
    
    This change is a stepping stone for metrics reporting via Prometheus. Kamon 
Prometheus support was added with Kamon 1.0 series.
---
 common/scala/build.gradle                          |  4 +-
 common/scala/src/main/resources/application.conf   | 19 +++++-----
 .../org/apache/openwhisk/common/Logging.scala      | 44 +++++++++++++++++-----
 .../openwhisk/core/controller/Controller.scala     |  8 ++--
 .../apache/openwhisk/core/invoker/Invoker.scala    | 12 +++---
 5 files changed, 55 insertions(+), 32 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index d7b8497..0a040b7 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -64,8 +64,8 @@ dependencies {
     compile 'com.github.ben-manes.caffeine:caffeine:2.6.2'
     compile 'com.google.code.findbugs:jsr305:3.0.2'
     compile 'io.fabric8:kubernetes-client:4.0.3'
-    compile 'io.kamon:kamon-core_2.12:0.6.7'
-    compile 'io.kamon:kamon-statsd_2.12:0.6.7'
+    compile 'io.kamon:kamon-core_2.12:1.1.3'
+    compile 'io.kamon:kamon-statsd_2.12:1.0.0'
     //for mesos
     compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.17'
 
diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index 40c98a7..40049f8 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -19,7 +19,11 @@ akka.http {
 
 #kamon related configuration
 kamon {
-
+    environment {
+      # Identifier for this service. For keeping it backward compatible 
setting to natch previous
+      # statsd name
+      service = "openwhisk-statsd"
+    }
     metric {
         tick-interval = 1 second
     }
@@ -40,17 +44,12 @@ kamon {
             dispatcher =  [ "*" ]
         }
 
-        simple-metric-key-generator {
-            # Application prefix for all metrics pushed to StatsD. The default 
namespacing scheme for metrics follows
-            # this pattern:
-            #    application.host.entity.entity-name.metric-name
-            application = "openwhisk-statsd"
-        }
+        metric-key-generator = 
org.apache.openwhisk.common.WhiskStatsDMetricKeyGenerator
     }
 
-    modules {
-        kamon-statsd.auto-start = yes
-    }
+    reporters = [
+        "kamon.statsd.StatsDReporter"
+    ]
 }
 
 whisk {
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 3bc3597..b7cff4e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -20,9 +20,11 @@ package org.apache.openwhisk.common
 import java.io.PrintStream
 import java.time.{Clock, Instant, ZoneId}
 import java.time.format.DateTimeFormatter
+
 import akka.event.Logging._
 import akka.event.LoggingAdapter
 import kamon.Kamon
+import kamon.statsd.{MetricKeyGenerator, SimpleMetricKeyGenerator}
 import org.apache.openwhisk.core.entity.ControllerInstanceId
 
 trait Logging {
@@ -210,17 +212,15 @@ object LogMarkerToken {
 }
 
 object MetricEmitter {
-
-  val metrics = Kamon.metrics
-
   def emitCounterMetric(token: LogMarkerToken, times: Long = 1): Unit = {
     if (TransactionId.metricsKamon) {
       if (TransactionId.metricsKamonTags) {
-        metrics
-          .counter(token.toString, token.tags)
+        Kamon
+          .counter(createName(token.toString, "counter"))
+          .refine(token.tags)
           .increment(times)
       } else {
-        metrics.counter(token.toStringWithSubAction).increment(times)
+        Kamon.counter(createName(token.toStringWithSubAction, 
"counter")).increment(times)
       }
     }
   }
@@ -228,14 +228,40 @@ object MetricEmitter {
   def emitHistogramMetric(token: LogMarkerToken, value: Long): Unit = {
     if (TransactionId.metricsKamon) {
       if (TransactionId.metricsKamonTags) {
-        metrics
-          .histogram(token.toString, token.tags)
+        Kamon
+          .histogram(createName(token.toString, "histogram"))
+          .refine(token.tags)
           .record(value)
       } else {
-        metrics.histogram(token.toStringWithSubAction).record(value)
+        Kamon.histogram(createName(token.toStringWithSubAction, 
"histogram")).record(value)
       }
     }
   }
+
+  /**
+   * Kamon 1.0 onwards does not include the metric type in the metric name 
which cause issue
+   * for us as we use same metric name for counter and histogram. So to be 
backward compatible we
+   * need to prefix the name with type
+   */
+  private def createName(name: String, metricType: String) = {
+    s"$metricType.$name"
+  }
+}
+
+/**
+ * Name generator to make names compatible to pre Kamon 1.0 logic. Statsd 
reporter "normalizes"
+ * the key name by replacing all `.` with `_`. Pre 1.0 the metric category was 
added by Statsd
+ * reporter itself. However now we pass it explicitly. So to retain the pre 
1.0 name we need to replace
+ * normalized name with one having category followed by `.` instead of `_`
+ */
+class WhiskStatsDMetricKeyGenerator(config: com.typesafe.config.Config) 
extends MetricKeyGenerator {
+  val simpleGen = new SimpleMetricKeyGenerator(config)
+  override def generateKey(name: String, tags: Map[String, String]): String = {
+    val key = simpleGen.generateKey(name, tags)
+    if (key.contains(".counter_")) key.replace(".counter_", ".counter.")
+    else if (key.contains(".histogram_")) key.replace(".histogram_", 
".histogram.")
+    else key
+  }
 }
 
 object LoggingMarkers {
diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
index 1cb38ee..5136a66 100644
--- 
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
@@ -42,8 +42,9 @@ import org.apache.openwhisk.core.loadBalancer.{InvokerState, 
LoadBalancerProvide
 import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
 import org.apache.openwhisk.spi.SpiLoader
 
+import scala.concurrent.ExecutionContext.Implicits
 import scala.concurrent.duration.DurationInt
-import scala.concurrent.{Await, Future}
+import scala.concurrent.Await
 import scala.util.{Failure, Success}
 
 /**
@@ -206,15 +207,14 @@ object Controller {
       "runtimes" -> runtimes.toJson)
 
   def main(args: Array[String]): Unit = {
-    Kamon.start()
+    Kamon.loadReportersFromConfig()
     implicit val actorSystem = ActorSystem("controller-actor-system")
     implicit val logger = new 
AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
 
     // Prepare Kamon shutdown
     
CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate,
 "shutdownKamon") { () =>
       logger.info(this, s"Shutting down Kamon with coordinated shutdown")
-      Kamon.shutdown()
-      Future.successful(Done)
+      Kamon.stopAllReporters().map(_ => Done)(Implicits.global)
     }
 
     // extract configuration data from the environment
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 14ca32b..2f35afb 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -37,7 +37,7 @@ import org.apache.openwhisk.spi.SpiLoader
 import org.apache.openwhisk.utils.ExecutionContextFactory
 
 import scala.concurrent.duration._
-import scala.concurrent.{Await, Future}
+import scala.concurrent.Await
 import scala.util.{Failure, Try}
 
 case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = 
None, displayedName: Option[String] = None)
@@ -59,13 +59,12 @@ object Invoker {
   def initKamon(instance: Int): Unit = {
     // Replace the hostname of the invoker to the assigned id of the invoker.
     val newKamonConfig = Kamon.config
-      .withValue(
-        "kamon.statsd.simple-metric-key-generator.hostname-override",
-        ConfigValueFactory.fromAnyRef(s"invoker$instance"))
-    Kamon.start(newKamonConfig)
+      .withValue("kamon.environment.host", 
ConfigValueFactory.fromAnyRef(s"invoker$instance"))
+    Kamon.reconfigure(newKamonConfig)
   }
 
   def main(args: Array[String]): Unit = {
+    Kamon.loadReportersFromConfig()
     implicit val ec = 
ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
     implicit val actorSystem: ActorSystem =
       ActorSystem(name = "invoker-actor-system", defaultExecutionContext = 
Some(ec))
@@ -75,8 +74,7 @@ object Invoker {
     // Prepare Kamon shutdown
     
CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate,
 "shutdownKamon") { () =>
       logger.info(this, s"Shutting down Kamon with coordinated shutdown")
-      Kamon.shutdown()
-      Future.successful(Done)
+      Kamon.stopAllReporters().map(_ => Done)
     }
 
     // load values for the required properties from the environment

Reply via email to