This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new a87340b Add more diagnostic information to completion ack processing (#4561) a87340b is described below commit a87340bf72914a2710542511415a5a0a047db1fc Author: Sven Lange-Last <sven.lange-l...@de.ibm.com> AuthorDate: Tue Jul 23 08:30:51 2019 +0200 Add more diagnostic information to completion ack processing (#4561) * Add more diagnostic information to completion ack processing Forced completion acks can be a source of overloaded invokers because forced acks free up invoker slots in the load balancer. If the invoker is just "late" with running an activation so that the invoker sends the result / completion ack after the completion ack timeout has been reached in the load balancer, the load balancer may send new activations to the invoker while it is still fully occupied. As a result, these new activations have to wait for a container and can cause forced com [...] Goal of this change is to improve visibility and diagnostic information so that this mechanism can be better understood and fixed in the next step. * Add metrics for forced completion acks. * Log more diagnostic information when forcing a completion ack due to timeout. * Log a warning if a completion ack arrives after it has already been forced - this is an indication that action processing took too long. Today, this situation is only logged as debug message. * Improve code comments. * Provide a function for calculating the completion ack timeout. In the long term, this should be unified with the action wait timeout in sequence activations. * Add metrics to documentation * Address review feedback Streamline implementation of `CompletionAckType` and sub-types to save boilerplate code. This should make the code more readable. * Address review feedback Use LogMarkerToken singletons when emitting metrics related to completion acks instead of creating LogMarkerToken instances whenever emitting a counter. * Address review feedback Only use curly braces for expressions in string interpolations where required to prevent IntelliJ warnings. * Address review feedback Make LogMarkerToken instances private. --- .../org/apache/openwhisk/common/Logging.scala | 43 +++++++++ .../core/loadBalancer/CommonLoadBalancer.scala | 107 ++++++++++++++++----- .../ShardingContainerPoolBalancer.scala | 15 ++- docs/metrics.md | 14 ++- 4 files changed, 152 insertions(+), 27 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala index 01c9754..c728c5a 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala @@ -352,6 +352,26 @@ object LoggingMarkers { private val containerClient = "containerClient" /* + * The following markers are used to emit log messages as well as metrics. Add all LogMarkerTokens below to + * have a reference list of all metrics. The list below contains LogMarkerToken singletons (val) as well as + * LogMarkerToken creation functions (def). The LogMarkerToken creation functions allow to include variable + * information in metrics, such as the controller / invoker id or commands executed by a container factory. + * + * When using LogMarkerTokens for emitting metrics, you should use the convenience functions only once to + * create LogMarkerToken singletons instead of creating LogMarkerToken instances over and over again for each + * metric emit. + * + * Example: + * val MY_COUNTER_GREEN = LoggingMarkers.MY_COUNTER(GreenCounter) + * ... + * MetricEmitter.emitCounterMetric(MY_COUNTER_GREEN) + * + * instead of + * + * MetricEmitter.emitCounterMetric(LoggingMarkers.MY_COUNTER(GreenCounter)) + */ + + /* * Controller related markers */ def CONTROLLER_STARTUP(id: String) = @@ -414,6 +434,29 @@ object LoggingMarkers { LogMarkerToken(loadbalancer + controllerInstance.asString, s"memory${actionType}Inflight", counter)( MeasurementUnit.none) + // Counter metrics for completion acks in load balancer + sealed abstract class CompletionAckType(val name: String) { def asString: String = name } + case object RegularCompletionAck extends CompletionAckType("regular") + case object ForcedCompletionAck extends CompletionAckType("forced") + case object HealthcheckCompletionAck extends CompletionAckType("healthcheck") + case object RegularAfterForcedCompletionAck extends CompletionAckType("regularAfterForced") + case object ForcedAfterRegularCompletionAck extends CompletionAckType("forcedAfterRegular") + + // Convenience function to create log marker tokens used for emitting counter metrics related to completion acks. + def LOADBALANCER_COMPLETION_ACK(controllerInstance: ControllerInstanceId, completionAckType: CompletionAckType) = + if (TransactionId.metricsKamonTags) + LogMarkerToken( + loadbalancer, + "completionAck", + counter, + None, + Map("controller_id" -> controllerInstance.asString, "type" -> completionAckType.asString))(MeasurementUnit.none) + else + LogMarkerToken( + loadbalancer + controllerInstance.asString, + "completionAck_" + completionAckType.asString, + counter)(MeasurementUnit.none) + // Time that is needed to execute the action val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start)(MeasurementUnit.time.milliseconds) diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala index 724a5f7..51bf4a8 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala @@ -84,17 +84,39 @@ abstract class CommonLoadBalancer(config: WhiskConfig, override def totalActiveActivations: Future[Int] = Future.successful(totalActivations.intValue()) /** + * Calculate the duration within which a completion ack must be received for an activation. + * + * Calculation is based on the passed action time limit. If the passed action time limit is shorter than + * the configured standard action time limit, the latter is used to avoid too tight timeouts. + * + * The base timeout is multiplied with a configurable timeout factor. This dilution controls how much slack you + * want to allow in your system before you start reporting failed activations. The default value of 2 bases + * on invoker behavior that a cold invocation's init duration may be as long as its run duration. Higher factors + * may account for additional wait times. + * + * Finally, a constant duration is added to the diluted timeout to be lenient towards general delays / wait times. + * + * @param actionTimeLimit the action's time limit + * @return the calculated time duration within which a completion ack must be received + */ + private def calculateCompletionAckTimeout(actionTimeLimit: FiniteDuration): FiniteDuration = { + (actionTimeLimit.max(TimeLimit.STD_DURATION) * lbConfig.timeoutFactor) + 1.minute + } + + /** * 2. Update local state with the activation to be executed scheduled. * * All activations are tracked in the activationSlots map. Additionally, blocking invokes - * are tracked in the activation results map. When a result is received via activeack, it + * are tracked in the activationPromises map. When a result is received via result ack, it * will cause the result to be forwarded to the caller waiting on the result, and cancel * the DB poll which is also trying to do the same. + * Once the completion ack arrives, activationSlots entry will be removed. */ protected def setupActivation(msg: ActivationMessage, action: ExecutableWhiskActionMetaData, instance: InvokerInstanceId): Future[Either[ActivationId, WhiskActivation]] = { + // Needed for emitting metrics. totalActivations.increment() val isBlackboxInvocation = action.exec.pull val totalActivationMemory = @@ -103,23 +125,28 @@ abstract class CommonLoadBalancer(config: WhiskConfig, activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new LongAdder()).increment() - // Timeout is a multiple of the configured maximum action duration. The minimum timeout is the configured standard - // value for action durations to avoid too tight timeouts. - // Timeouts in general are diluted by a configurable factor. In essence this factor controls how much slack you want - // to allow in your topics before you start reporting failed activations. - val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION) * lbConfig.timeoutFactor) + 1.minute + // Completion Ack must be received within the calculated time. + val completionAckTimeout = calculateCompletionAckTimeout(action.limits.timeout.duration) + // If activation is blocking, store a promise that we can mark successful later on once the result ack + // arrives. Return a Future representing the promise to caller. + // If activation is non-blocking, return a successfully completed Future to caller. val resultPromise = if (msg.blocking) { activationPromises.getOrElseUpdate(msg.activationId, Promise[Either[ActivationId, WhiskActivation]]()).future } else Future.successful(Left(msg.activationId)) - // Install a timeout handler for the catastrophic case where an active ack is not received at all + // Install a timeout handler for the catastrophic case where a completion ack is not received at all // (because say an invoker is down completely, or the connection to the message bus is disrupted) or when - // the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized); + // the completion ack is significantly delayed (possibly dues to long queues but the subject should not be penalized); // in this case, if the activation handler is still registered, remove it and update the books. + // + // Attention: a significantly delayed completion ack means that the invoker is still busy or will be busy in future + // with running the action. So the current strategy of freeing up the activation's memory in invoker + // book-keeping will allow the load balancer to send more activations to the invoker. This can lead to + // invoker overloads so that activations need to wait until other activations complete. activationSlots.getOrElseUpdate( msg.activationId, { - val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) { + val timeoutHandler = actorSystem.scheduler.scheduleOnce(completionAckTimeout) { processCompletion(msg.activationId, msg.transid, forced = true, isSystemError = false, invoker = instance) } @@ -129,10 +156,12 @@ abstract class CommonLoadBalancer(config: WhiskConfig, msg.user.namespace.uuid, instance, action.limits.memory.megabytes.MB, + action.limits.timeout.duration, action.limits.concurrency.maxConcurrent, action.fullyQualifiedName(true), timeoutHandler, - isBlackboxInvocation) + isBlackboxInvocation, + msg.blocking) }) resultPromise @@ -167,14 +196,11 @@ abstract class CommonLoadBalancer(config: WhiskConfig, } } - /** - * Subscribes to active acks (completion messages from the invokers), and - * registers a handler for received active acks from invokers. - */ + /** Subscribes to ack messages from the invokers (result / completion) and registers a handler for these messages. */ private val activationFeed: ActorRef = feedFactory.createFeed(actorSystem, messagingProvider, processAcknowledgement) - /** 4. Get the active-ack message and parse it */ + /** 4. Get the ack message and parse it */ protected[loadBalancer] def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = Future { val raw = new String(bytes, StandardCharsets.UTF_8) AcknowledegmentMessage.parse(raw) match { @@ -214,6 +240,18 @@ abstract class CommonLoadBalancer(config: WhiskConfig, protected def releaseInvoker(invoker: InvokerInstanceId, entry: ActivationEntry) + // Singletons for counter metrics related to completion acks + protected val LOADBALANCER_COMPLETION_ACK_REGULAR = + LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, RegularCompletionAck) + protected val LOADBALANCER_COMPLETION_ACK_FORCED = + LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, ForcedCompletionAck) + protected val LOADBALANCER_COMPLETION_ACK_HEALTHCHECK = + LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, HealthcheckCompletionAck) + protected val LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED = + LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, RegularAfterForcedCompletionAck) + protected val LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR = + LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, ForcedAfterRegularCompletionAck) + /** 6. Process the completion ack and update the state */ protected[loadBalancer] def processCompletion(aid: ActivationId, tid: TransactionId, @@ -238,7 +276,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig, totalActivations.decrement() val totalActivationMemory = if (entry.isBlackbox) totalBlackBoxActivationMemory else totalManagedActivationMemory - totalActivationMemory.add(entry.memory.toMB * (-1)) + totalActivationMemory.add(entry.memoryLimit.toMB * (-1)) activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement()) releaseInvoker(invoker, entry) @@ -248,16 +286,28 @@ abstract class CommonLoadBalancer(config: WhiskConfig, // notice here that the activationPromises is not touched, because the expectation is that // the active ack is received as expected, and processing that message removed the promise // from the corresponding map + logging.info(this, s"received completion ack for '$aid', system error=$isSystemError")(tid) + + MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR) + } else { // the entry has timed out; if the active ack is still around, remove its entry also // and complete the promise with a failure if necessary activationPromises .remove(aid) .foreach(_.tryFailure(new Throwable("no completion or active ack received yet"))) + val actionType = if (entry.isBlackbox) "blackbox" else "managed" + val blockingType = if (entry.isBlocking) "blocking" else "non-blocking" + val completionAckTimeout = calculateCompletionAckTimeout(entry.timeLimit) + logging.warn( + this, + s"forced completion ack for '$aid', action '${entry.fullyQualifiedEntityName}' ($actionType), $blockingType, mem limit ${entry.memoryLimit.toMB} MB, time limit ${entry.timeLimit.toMillis} ms, completion ack timeout $completionAckTimeout from $invoker")( + tid) + + MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED) } - logging.info(this, s"${if (!forced) "received" else "forced"} completion ack for '$aid'")(tid) - // Active acks that are received here are strictly from user actions - health actions are not part of + // Completion acks that are received here are strictly from user actions - health actions are not part of // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion. // guard this invokerPool ! InvocationFinishedMessage(invoker, invocationResult) @@ -266,17 +316,28 @@ abstract class CommonLoadBalancer(config: WhiskConfig, // is important to pass to the invokerPool because they are used to determine if the invoker can be considered // healthy again. logging.info(this, s"received completion ack for health action on $invoker")(tid) + + MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_HEALTHCHECK) + // guard this invokerPool ! InvocationFinishedMessage(invoker, invocationResult) case None if !forced => - // Received an active-ack that has already been taken out of the state because of a timeout (forced active-ack). + // Received a completion ack that has already been taken out of the state because of a timeout (forced ack). // The result is ignored because a timeout has already been reported to the invokerPool per the force. - logging.debug(this, s"received completion ack for '$aid' which has no entry")(tid) + // Logging this condition as a warning because the invoker processed the activation and sent a completion + // message - but not in time. + logging.warn( + this, + s"received completion ack for '$aid' from $invoker which has no entry, system error=$isSystemError")(tid) + + MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED) case None => - // The entry has already been removed by an active ack. This part of the code is reached by the timeout and can - // happen if active-ack and timeout happen roughly at the same time (the timeout was triggered before the active - // ack canceled the timer). As the active ack is already processed we don't have to do anything here. + // The entry has already been removed by a completion ack. This part of the code is reached by the timeout and can + // happen if completion ack and timeout happen roughly at the same time (the timeout was triggered before the completion + // ack canceled the timer). As the completion ack is already processed we don't have to do anything here. logging.debug(this, s"forced completion ack for '$aid' which has no entry")(tid) + + MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR) } } } diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index 4a57400..d03081d 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -41,6 +41,7 @@ import org.apache.openwhisk.spi.SpiLoader import scala.annotation.tailrec import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration /** * A loadbalancer that schedules workload based on a hashing-algorithm. @@ -325,7 +326,7 @@ class ShardingContainerPoolBalancer( override protected def releaseInvoker(invoker: InvokerInstanceId, entry: ActivationEntry) = { schedulingState.invokerSlots .lift(invoker.toInt) - .foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName, entry.maxConcurrent, entry.memory.toMB.toInt)) + .foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName, entry.maxConcurrent, entry.memoryLimit.toMB.toInt)) } } @@ -603,13 +604,21 @@ case class ShardingContainerPoolBalancerConfig(managedFraction: Double, blackbox * @param id id of the activation * @param namespaceId namespace that invoked the action * @param invokerName invoker the action is scheduled to + * @param memoryLimit memory limit of the invoked action + * @param timeLimit time limit of the invoked action + * @param maxConcurrent concurrency limit of the invoked action + * @param fullyQualifiedEntityName fully qualified name of the invoked action * @param timeoutHandler times out completion of this activation, should be canceled on good paths + * @param isBlackbox true if the invoked action is a blackbox action, otherwise false (managed action) + * @param isBlocking true if the action is invoked in a blocking fashion, i.e. "somebody" waits for the result */ case class ActivationEntry(id: ActivationId, namespaceId: UUID, invokerName: InvokerInstanceId, - memory: ByteSize, + memoryLimit: ByteSize, + timeLimit: FiniteDuration, maxConcurrent: Int, fullyQualifiedEntityName: FullyQualifiedEntityName, timeoutHandler: Cancellable, - isBlackbox: Boolean) + isBlackbox: Boolean, + isBlocking: Boolean) diff --git a/docs/metrics.md b/docs/metrics.md index b2bfc48..9d00b38 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -98,7 +98,7 @@ Histogram record the [distribution](http://kamon.io/documentation/0.6.x/kamon-co #### Gauges -Gauges record the [distribution](https://kamon.io/docs/latest/core/metrics/#gauges) of given metric and there names are prefixed with `openwhisk.gauge`. For example `openwhisk.gauge.loadbalancer_totalHealthyInvoker_counter`. A gauge metrics provides the value at the given point and reports the same data unless the value has been changed be incremental or decremental than before. Gauges are useful for reporting metrics like kafka queue size or disk size. +Gauges record the [distribution](https://kamon.io/docs/latest/core/metrics/#gauges) of given metric and their names are prefixed with `openwhisk.gauge`. For example `openwhisk.gauge.loadbalancer_totalHealthyInvoker_counter`. A gauge metrics provides the value at the given point and reports the same data unless the value has been changed be incremental or decremental than before. Gauges are useful for reporting metrics like kafka queue size or disk size. ### Metric Details @@ -157,6 +157,18 @@ Metrics below are for invoker state as recorded within load balancer monitoring. * `openwhisk.gauge.loadbalancer_totalOfflineInvoker<invokerType>_counter` (gauge) - Records the count of managed invokers considered offline when no health pings arrive from the invokers. **invokerType** defines whether it is a managed or a blackbox invoker. * `openwhisk.gauge.loadbalancer_totalUnhealthyInvoker<invokerType>_counter` (gauge) - Records the count of managed invokers considered unhealthy when health pings arrive fine but the invokers report system errors. **invokerType** defines whether it is a managed or a blackbox invoker. +Metrics below provide information about completion ack processing in load balancers. Depending on configuration setting `metrics_kamon_tags` (see above), a base metric with tags or a set of metrics without tags will be emitted. + +* Base metric `openwhisk.counter.loadbalancer_completionAck_counter`: count of processed regular or forced completion acks. +* Tag `controller_id`: the controller's id. +* Tag `type`: the exact type of completion ack. + * Type `regular`: a regular completion ack sent by an invoker and received in time. Does not include completion acks for healthcheck actions. + * Type `forced`: no completion ack was received in time and the timeout forced the completion ack to close. + * Type `healthcheck`: a regular completion ack for healthcheck actions sent by an invoker and received in time. + * Type `regularAfterForced`: a regular completion ack sent by an invoker and not received in time. The completion ack was already forced. + * Type `forcedAfterRegular`: a timeout tries to force a completion ack that has already been closed by a regular completion ack. A race condition that can occur if the regular completion ack is received near the timeout. +* If `metrics_kamon_tags` is set to `false`, a set of metrics will be emitted constructed using following scheme: `openwhisk.counter.loadbalancer<controller_id>_completionAck_<type>_counter`. + #### Invoker metrics ##### Container Init