This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 8e7c8e5 Treat a timed out active ack as failed activation in invokerhealth protocol (#2658) 8e7c8e5 is described below commit 8e7c8e5bc7f2cd672c2314d13e56ef8f94cb96f6 Author: Christian Bickel <git...@cbickel.de> AuthorDate: Wed Sep 27 21:49:29 2017 +0200 Treat a timed out active ack as failed activation in invokerhealth protocol (#2658) --- .../core/loadBalancer/InvokerSupervision.scala | 6 ++-- .../core/loadBalancer/LoadBalancerService.scala | 33 ++++++++++++++++------ 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala index 94fc070..13517de 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala @@ -283,9 +283,11 @@ class InvokerActor(invokerInstance: InstanceId, controllerInstance: InstanceId) private def handleCompletionMessage(wasActivationSuccessful: Boolean, buffer: RingBuffer[Boolean]) = { buffer.add(wasActivationSuccessful) - // If the current state is UnHealthy, then the active ack is the result of a test action. - // If this is successful it seems like the Invoker is Healthy again. So we execute immediately + // If the action is successful it seems like the Invoker is Healthy again. So we execute immediately // a new test action to remove the errors out of the RingBuffer as fast as possible. + // The actions that arrive while the invoker is unhealthy are most likely health actions. + // It is possible they are normal user actions as well. This can happen if such actions were in the + // invoker queue or in progress while the invoker's status flipped to Unhealthy. if (wasActivationSuccessful && stateName == UnHealthy) { invokeTestAction() } diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala index b7ee0dd..c957fa2 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala @@ -125,19 +125,39 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore */ private def processCompletion(response: Either[ActivationId, WhiskActivation], tid: TransactionId, - forced: Boolean): Unit = { + forced: Boolean, + invoker: InstanceId): Unit = { val aid = response.fold(l => l, r => r.activationId) + + // treat left as success (as it is the result of a message exceeding the bus limit) + val isSuccess = response.fold(l => true, r => !r.response.isWhiskError) + loadBalancerData.removeActivation(aid) match { case Some(entry) => logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid) + // Active acks that are received here are strictly from user actions - health actions are not part of + // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion. + // If the active ack was forced, because the waiting period expired, treat it as a failed activation. + // A cluster of such failures will eventually turn the invoker unhealthy and suspend queuing activations + // to that invoker topic. + invokerPool ! InvocationFinishedMessage(invoker, isSuccess && !forced) if (!forced) { entry.promise.trySuccess(response) } else { entry.promise.tryFailure(new Throwable("no active ack received")) } - case None => - // the entry was already removed + case None if !forced => + // the entry has already been removed but we receive an active ack for this activation Id. + // This happens for health actions, because they don't have an entry in Loadbalancerdata or + // for activations that already timed out. + // For both cases, it looks like the invoker works again and we should send the status of + // the activation to the invokerPool. + invokerPool ! InvocationFinishedMessage(invoker, isSuccess) logging.debug(this, s"received active ack for '$aid' which has no entry")(tid) + case None => + // the entry has already been removed by an active ack. This part of the code is reached by the timeout. + // As the active ack is already processed we don't have to do anything here. + logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid) } } @@ -156,7 +176,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore // in this case, if the activation handler is still registered, remove it and update the books. loadBalancerData.putActivation(activationId, { actorSystem.scheduler.scheduleOnce(timeout) { - processCompletion(Left(activationId), transid, forced = true) + processCompletion(Left(activationId), transid, forced = true, invoker = invokerName) } ActivationEntry(activationId, namespaceId, invokerName, Promise[Either[ActivationId, WhiskActivation]]()) @@ -254,11 +274,8 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore val raw = new String(bytes, StandardCharsets.UTF_8) CompletionMessage.parse(raw) match { case Success(m: CompletionMessage) => - processCompletion(m.response, m.transid, false) - // treat left as success (as it is the result a the message exceeding the bus limit) - val isSuccess = m.response.fold(l => true, r => !r.response.isWhiskError) + processCompletion(m.response, m.transid, forced = false, invoker = m.invoker) activationFeed ! MessageFeed.Processed - invokerPool ! InvocationFinishedMessage(m.invoker, isSuccess) case Failure(t) => activationFeed ! MessageFeed.Processed -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].