markusthoemmes closed pull request #3180: Emit metrics relevant for the usage of the system URL: https://github.com/apache/incubator-openwhisk/pull/3180
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala index e63c824f30..06078ba545 100644 --- a/common/scala/src/main/scala/whisk/common/Logging.scala +++ b/common/scala/src/main/scala/whisk/common/Logging.scala @@ -22,12 +22,15 @@ import java.time.Clock import java.time.Instant import java.time.ZoneId import java.time.format.DateTimeFormatter +import java.util.concurrent.atomic.AtomicLong import akka.event.Logging.{DebugLevel, ErrorLevel, InfoLevel, WarningLevel} import akka.event.Logging.LogLevel import akka.event.LoggingAdapter import kamon.Kamon +import scala.concurrent.duration._ + trait Logging { /** @@ -165,10 +168,11 @@ private object Emitter { } case class LogMarkerToken(component: String, action: String, state: String) { - override def toString() = component + "_" + action + "_" + state + override def toString: String = asString + def asString: String = component + "_" + action + "_" + state - def asFinish = copy(state = LoggingMarkers.finish) - def asError = copy(state = LoggingMarkers.error) + def asFinish: LogMarkerToken = copy(state = LoggingMarkers.finish) + def asError: LogMarkerToken = copy(state = LoggingMarkers.error) } object LogMarkerToken { @@ -184,16 +188,22 @@ object MetricEmitter { val metrics = Kamon.metrics - def emitCounterMetric(token: LogMarkerToken) = { - metrics - .counter(token.toString) - .increment(1) - } + def incrementCounter(token: LogMarkerToken): Unit = metrics.counter(token.asString).increment(1) + def emitHistogramMetric(token: LogMarkerToken, value: Long): Unit = metrics.histogram(token.asString).record(value) + + /** + * Creating a gauge to record values to. + * + * Uses a StableGauge which will always report the same value until that value is changed. + * + * @param token name of the gauge + * @return a stable gauge to record values to + */ + def setupGauge(token: LogMarkerToken) = new StableGauge(token.asString) + class StableGauge(name: String, value: AtomicLong = new AtomicLong(0)) { + metrics.gauge(name, 1.second)(value.get) - def emitHistogramMetric(token: LogMarkerToken, value: Long) = { - metrics - .histogram(token.toString) - .record(value) + def record(newValue: Long): Unit = value.set(newValue) } } @@ -234,6 +244,9 @@ object LoggingMarkers { // Check invoker healthy state from loadbalancer val LOADBALANCER_INVOKER_OFFLINE = LogMarkerToken(loadbalancer, "invokerOffline", count) val LOADBALANCER_INVOKER_UNHEALTHY = LogMarkerToken(loadbalancer, "invokerUnhealthy", count) + val LOADBALANCER_INVOKER_HEALTHY = LogMarkerToken(loadbalancer, "invokerHealthy", count) + + val LOADBALANCER_ACTIVATIONS_INFLIGHT = LogMarkerToken(loadbalancer, "activationsInFlight", count) // Time that is needed to execute the action val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start) diff --git a/common/scala/src/main/scala/whisk/common/TransactionId.scala b/common/scala/src/main/scala/whisk/common/TransactionId.scala index 95e6eefc09..bf907162a7 100644 --- a/common/scala/src/main/scala/whisk/common/TransactionId.scala +++ b/common/scala/src/main/scala/whisk/common/TransactionId.scala @@ -63,7 +63,7 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal { } if (TransactionId.metricsKamon) { - MetricEmitter.emitCounterMetric(marker) + MetricEmitter.incrementCounter(marker) } } @@ -89,7 +89,7 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal { } if (TransactionId.metricsKamon) { - MetricEmitter.emitCounterMetric(marker) + MetricEmitter.incrementCounter(marker) } StartMarker(Instant.now, marker) @@ -156,7 +156,7 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal { if (TransactionId.metricsKamon) { MetricEmitter.emitHistogramMetric(endMarker, deltaToEnd) - MetricEmitter.emitCounterMetric(endMarker) + MetricEmitter.incrementCounter(endMarker) } } diff --git a/common/scala/src/main/scala/whisk/http/BasicHttpService.scala b/common/scala/src/main/scala/whisk/http/BasicHttpService.scala index 798bb1eff8..379ddebd0a 100644 --- a/common/scala/src/main/scala/whisk/http/BasicHttpService.scala +++ b/common/scala/src/main/scala/whisk/http/BasicHttpService.scala @@ -114,7 +114,7 @@ trait BasicHttpService extends Directives with TransactionCounter { if (TransactionId.metricsKamon) { MetricEmitter.emitHistogramMetric(token, tid.deltaToStart) - MetricEmitter.emitCounterMetric(token) + MetricEmitter.incrementCounter(token) } if (TransactionId.metricsLog) { Some(LogEntry(s"[$tid] [$name] $marker", l)) diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala index 6256af2a5b..9e2192556a 100644 --- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala +++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala @@ -17,8 +17,7 @@ package whisk.core.entitlement -import whisk.common.Logging -import whisk.common.TransactionId +import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId} import whisk.core.entity.Identity import whisk.core.loadBalancer.LoadBalancer import whisk.http.Messages @@ -55,11 +54,14 @@ class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: I } } + private val inflightActivationsReporter = MetricEmitter.setupGauge(LoggingMarkers.LOADBALANCER_ACTIVATIONS_INFLIGHT) + /** * Checks whether the system is in a generally overloaded state. */ def isOverloaded()(implicit tid: TransactionId): Future[Boolean] = { loadBalancer.totalActiveActivations.map { concurrentActivations => + inflightActivationsReporter.record(concurrentActivations) logging.info( this, s"concurrent activations in system = $concurrentActivations, below limit = $systemOverloadLimit") diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala index cd80a8ee12..cb3259efdb 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala @@ -35,10 +35,7 @@ import akka.actor.FSM.Transition import akka.actor.Props import akka.pattern.pipe import akka.util.Timeout -import whisk.common.AkkaLogging -import whisk.common.LoggingMarkers -import whisk.common.RingBuffer -import whisk.common.TransactionId +import whisk.common._ import whisk.core.connector._ import whisk.core.entitlement.Privilege import whisk.core.entity.ActivationId.ActivationIdGenerator @@ -117,7 +114,15 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef, case msg: ActivationRequest => sendActivationToInvoker(msg.msg, msg.invoker).pipeTo(sender) } - def logStatus() = { + val healthyInvokers = MetricEmitter.setupGauge(LoggingMarkers.LOADBALANCER_INVOKER_HEALTHY) + val unhealthyInvokers = MetricEmitter.setupGauge(LoggingMarkers.LOADBALANCER_INVOKER_UNHEALTHY) + val offlineInvokers = MetricEmitter.setupGauge(LoggingMarkers.LOADBALANCER_INVOKER_OFFLINE) + + def logStatus(): Unit = { + healthyInvokers.record(status.count(_._2 == Healthy).toLong) + unhealthyInvokers.record(status.count(_._2 == UnHealthy).toLong) + offlineInvokers.record(status.count(_._2 == Offline).toLong) + val pretty = status.map { case (instance, state) => s"${instance.toInt} -> $state" } logging.info(this, s"invoker status changed to ${pretty.mkString(", ")}") } @@ -254,19 +259,9 @@ class InvokerActor(invokerInstance: InstanceId, controllerInstance: InstanceId) /** Logging on Transition change */ onTransition { - case _ -> Offline => - transid.mark( - this, - LoggingMarkers.LOADBALANCER_INVOKER_OFFLINE, - s"$name is offline", - akka.event.Logging.WarningLevel) - case _ -> UnHealthy => - transid.mark( - this, - LoggingMarkers.LOADBALANCER_INVOKER_UNHEALTHY, - s"$name is unhealthy", - akka.event.Logging.WarningLevel) - case _ -> Healthy => logging.info(this, s"$name is healthy") + case _ -> Offline => logging.warn(this, s"$name is offline") + case _ -> UnHealthy => logging.warn(this, s"$name is unhealthy") + case _ -> Healthy => logging.info(this, s"$name is healthy") } /** Scheduler to send test activations when the invoker is unhealthy. */ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services