This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 1e90267 make sure to cancel the timeout handler, in LoadBalancerService (#3118) 1e90267 is described below commit 1e902675bcf6160af8d72d8251739477162f8d76 Author: Nick Mitchell <star...@users.noreply.github.com> AuthorDate: Thu Dec 21 16:28:47 2017 -0500 make sure to cancel the timeout handler, in LoadBalancerService (#3118) Make sure to cancel the timeout handler on the active ack when a response is received. This reduces memory drag. --- .../whisk/core/loadBalancer/LoadBalancerData.scala | 3 +++ .../core/loadBalancer/LoadBalancerService.scala | 20 ++++++++++++++------ .../loadBalancer/test/LoadBalancerDataTests.scala | 13 ++++++++++--- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala index 1866d2d..0018cbb 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala @@ -19,11 +19,14 @@ package whisk.core.loadBalancer import whisk.core.entity.{ActivationId, InstanceId, UUID, WhiskActivation} +import akka.actor.Cancellable import scala.concurrent.{Future, Promise} +// please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success case class ActivationEntry(id: ActivationId, namespaceId: UUID, invokerName: InstanceId, + timeoutHandler: Cancellable, promise: Promise[Either[ActivationId, WhiskActivation]]) trait LoadBalancerData { diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala index 17d8796..29a169f 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala @@ -154,6 +154,7 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion. invokerPool ! InvocationFinishedMessage(invoker, isSuccess) if (!forced) { + entry.timeoutHandler.cancel() entry.promise.trySuccess(response) } else { entry.promise.tryFailure(new Throwable("no active ack received")) @@ -187,13 +188,20 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore // in this case, if the activation handler is still registered, remove it and update the books. // in case of missing synchronization between n controllers in HA configuration the invoker queue can be overloaded // n-1 times and the maximal time for answering with active ack can be n times the action time (plus some overhead) - loadBalancerData.putActivation(activationId, { - actorSystem.scheduler.scheduleOnce(timeout) { - processCompletion(Left(activationId), transid, forced = true, invoker = invokerName) - } + loadBalancerData.putActivation( + activationId, { + val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) { + processCompletion(Left(activationId), transid, forced = true, invoker = invokerName) + } - ActivationEntry(activationId, namespaceId, invokerName, Promise[Either[ActivationId, WhiskActivation]]()) - }) + // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success + ActivationEntry( + activationId, + namespaceId, + invokerName, + timeoutHandler, + Promise[Either[ActivationId, WhiskActivation]]()) + }) } /** diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala index af51102..9afa67f 100644 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerDataTests.scala @@ -18,6 +18,7 @@ package whisk.core.loadBalancer.test import akka.actor.ActorSystem +import akka.actor.Cancellable import com.typesafe.config.{ConfigFactory, ConfigValueFactory} import common.StreamLogging import org.scalatest.{FlatSpec, Matchers} @@ -30,10 +31,16 @@ import whisk.core.entity.InstanceId import scala.concurrent.duration._ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging { + final val emptyCancellable: Cancellable = new Cancellable { + def isCancelled = false + def cancel() = true + } val activationIdPromise = Promise[Either[ActivationId, WhiskActivation]]() - val firstEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(0), activationIdPromise) - val secondEntry: ActivationEntry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), activationIdPromise) + val firstEntry: ActivationEntry = + ActivationEntry(ActivationId(), UUID(), InstanceId(0), emptyCancellable, activationIdPromise) + val secondEntry: ActivationEntry = + ActivationEntry(ActivationId(), UUID(), InstanceId(1), emptyCancellable, activationIdPromise) val port = 2552 val host = "127.0.0.1" @@ -149,7 +156,7 @@ class LoadBalancerDataTests extends FlatSpec with Matchers with StreamLogging { it should "respond with different values accordingly" in { - val entry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), activationIdPromise) + val entry = ActivationEntry(ActivationId(), UUID(), InstanceId(1), emptyCancellable, activationIdPromise) val entrySameInvokerAndNamespace = entry.copy(id = ActivationId()) val entrySameInvoker = entry.copy(id = ActivationId(), namespaceId = UUID()) -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].