This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 6686820 Fixes bug in invoker supervision on startup. (#5050)
6686820 is described below
commit 66868205b52ee65f28756038c44d8df5b96d2bcc
Author: rodric rabbah <[email protected]>
AuthorDate: Tue Jan 26 10:41:53 2021 -0500
Fixes bug in invoker supervision on startup. (#5050)
---
.../core/loadBalancer/InvokerSupervision.scala | 83 ++++++++++++++--------
.../core/containerpool/ContainerPool.scala | 8 ++-
.../test/InvokerSupervisionTests.scala | 51 ++++++++++++-
3 files changed, 108 insertions(+), 34 deletions(-)
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
index 5a9d367..d0a648b 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala
@@ -293,20 +293,12 @@ class InvokerActor(invokerInstance: InvokerInstanceId,
controllerInstance: Contr
val healthyTimeout: FiniteDuration = 10.seconds
- // This is done at this point to not intermingle with the state-machine
- // especially their timeouts.
+ // This is done at this point to not intermingle with the state-machine
especially their timeouts.
def customReceive: Receive = {
- case _: RecordMetadata => // The response of putting testactions to the
MessageProducer. We don't have to do anything with them.
+ case _: RecordMetadata => // Ignores the result of publishing test actions
to MessageProducer.
}
- override def receive: Receive = customReceive.orElse(super.receive)
- /** Always start UnHealthy. Then the invoker receives some test activations
and becomes Healthy. */
- startWith(Unhealthy, InvokerInfo(new
RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize)))
-
- /** An Offline invoker represents an existing but broken invoker. This
means, that it does not send pings anymore. */
- when(Offline) {
- case Event(_: PingMessage, _) => goto(Unhealthy)
- }
+ override def receive: Receive = customReceive.orElse(super.receive)
// To be used for all states that should send test actions to reverify the
invoker
val healthPingingState: StateFunction = {
@@ -317,6 +309,22 @@ class InvokerActor(invokerInstance: InvokerInstanceId,
controllerInstance: Contr
stay
}
+ // To be used for all states that should send test actions to reverify the
invoker
+ def healthPingingTransitionHandler(state: InvokerState): TransitionHandler =
{
+ case _ -> `state` =>
+ invokeTestAction()
+ setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
+ case `state` -> _ => cancelTimer(InvokerActor.timerName)
+ }
+
+ /** Always start UnHealthy. Then the invoker receives some test activations
and becomes Healthy. */
+ startWith(Unhealthy, InvokerInfo(new
RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize)))
+
+ /** An Offline invoker represents an existing but broken invoker. This
means, that it does not send pings anymore. */
+ when(Offline) {
+ case Event(_: PingMessage, _) => goto(Unhealthy)
+ }
+
/** An Unhealthy invoker represents an invoker that was not able to handle
actions successfully. */
when(Unhealthy, stateTimeout = healthyTimeout)(healthPingingState)
@@ -324,20 +332,20 @@ class InvokerActor(invokerInstance: InvokerInstanceId,
controllerInstance: Contr
when(Unresponsive, stateTimeout = healthyTimeout)(healthPingingState)
/**
- * A Healthy invoker is characterized by continuously getting pings. It will
go offline if that state is not confirmed
- * for 20 seconds.
+ * A Healthy invoker is characterized by continuously getting pings.
+ * It will go offline if that state is not confirmed for 20 seconds.
*/
when(Healthy, stateTimeout = healthyTimeout) {
case Event(_: PingMessage, _) => stay
case Event(StateTimeout, _) => goto(Offline)
}
- /** Handle the completion of an Activation in every state. */
+ /** Handles the completion of an Activation in every state. */
whenUnhandled {
case Event(cm: InvocationFinishedMessage, info) =>
handleCompletionMessage(cm.result, info.buffer)
}
- /** Logging on Transition change */
+ /** Logs transition changes. */
onTransition {
case _ -> newState if !newState.isUsable =>
transid.mark(
@@ -348,14 +356,6 @@ class InvokerActor(invokerInstance: InvokerInstanceId,
controllerInstance: Contr
case _ -> newState if newState.isUsable => logging.info(this, s"$name is
${newState.asString}")
}
- // To be used for all states that should send test actions to reverify the
invoker
- def healthPingingTransitionHandler(state: InvokerState): TransitionHandler =
{
- case _ -> `state` =>
- invokeTestAction()
- setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
- case `state` -> _ => cancelTimer(InvokerActor.timerName)
- }
-
onTransition(healthPingingTransitionHandler(Unhealthy))
onTransition(healthPingingTransitionHandler(Unresponsive))
@@ -372,8 +372,8 @@ class InvokerActor(invokerInstance: InvokerInstanceId,
controllerInstance: Contr
buffer:
RingBuffer[InvocationFinishedResult]) = {
buffer.add(result)
- // If the action is successful it seems like the Invoker is Healthy again.
So we execute immediately
- // a new test action to remove the errors out of the RingBuffer as fast as
possible.
+ // If the action is successful, the Invoker is Healthy. We execute
additional test actions
+ // immediately to clear the RingBuffer as fast as possible.
// The actions that arrive while the invoker is unhealthy are most likely
health actions.
// It is possible they are normal user actions as well. This can happen if
such actions were in the
// invoker queue or in progress while the invoker's status flipped to
Unhealthy.
@@ -381,19 +381,44 @@ class InvokerActor(invokerInstance: InvokerInstanceId,
controllerInstance: Contr
invokeTestAction()
}
- // Stay in online if the activations was successful.
- // Stay in offline, if an activeAck reaches the controller.
+ // Stay online if the activations was successful.
+ // Stay offline if an activeAck is received (a stale activation) but the
invoker ceased pinging.
if ((stateName == Healthy && result == InvocationFinishedResult.Success)
|| stateName == Offline) {
stay
} else {
val entries = buffer.toList
- // Goto Unhealthy or Unresponsive respectively if there are more errors
than accepted in buffer, else goto Healthy
+
+ // Goto Unhealthy or Unresponsive respectively if there are more errors
than accepted in buffer at steady state.
+ // Otherwise transition to Healthy on successful activations only.
if (entries.count(_ == InvocationFinishedResult.SystemError) >
InvokerActor.bufferErrorTolerance) {
+ // Note: The predicate is false if the ring buffer is still being
primed
+ // (i.e., the entries.size <= bufferErrorTolerance).
gotoIfNotThere(Unhealthy)
} else if (entries.count(_ == InvocationFinishedResult.Timeout) >
InvokerActor.bufferErrorTolerance) {
+ // Note: The predicate is false if the ring buffer is still being
primed
+ // (i.e., the entries.size <= bufferErrorTolerance).
gotoIfNotThere(Unresponsive)
} else {
- gotoIfNotThere(Healthy)
+ result match {
+ case InvocationFinishedResult.Success =>
+ // Eagerly transition to healthy, at steady state (there aren't
sufficient contra-indications) or
+ // during priming of the ring buffer. In case of the latter, there
is at least one additional test
+ // action in flight which can reverse the transition later.
+ gotoIfNotThere(Healthy)
+
+ case InvocationFinishedResult.SystemError if (entries.size <=
InvokerActor.bufferErrorTolerance) =>
+ // The ring buffer is not fully primed yet, stay/goto Unhealthy.
+ gotoIfNotThere(Unhealthy)
+
+ case InvocationFinishedResult.Timeout if (entries.size <=
InvokerActor.bufferErrorTolerance) =>
+ // The ring buffer is not fully primed yet, stay/goto Unresponsive.
+ gotoIfNotThere(Unresponsive)
+
+ case _ =>
+ // At steady state, the state of the buffer superceded and we hold
the current state
+ // until enough events have occured to transition to a new state.
+ stay
+ }
}
}
}
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index 92f058b..4b66e99 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -270,19 +270,21 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
freePool.get(sender()).foreach { f =>
freePool = freePool - sender()
}
+
// container was busy (busy indicates at full capacity), so there is
capacity to accept another job request
busyPool.get(sender()).foreach { _ =>
busyPool = busyPool - sender()
}
processBufferOrFeed()
- //in case this was a prewarm
+ // in case this was a prewarm
prewarmedPool.get(sender()).foreach { data =>
prewarmedPool = prewarmedPool - sender()
}
- //in case this was a starting prewarm
+
+ // in case this was a starting prewarm
prewarmStartingPool.get(sender()).foreach { _ =>
- logging.info(this, "failed starting prewarm removed")
+ logging.info(this, "failed starting prewarm, removed")
prewarmStartingPool = prewarmStartingPool - sender()
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 1cf7383..7e0d88f 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -207,6 +207,54 @@ class InvokerSupervisionTests
behavior of "InvokerActor"
+ it should "start and stay unhealthy while min threshold is not met" in {
+ val invoker =
+ TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory =
defaultUserMemory), ControllerInstanceId("0")))
+ invoker.stateName shouldBe Unhealthy
+
+ (1 to InvokerActor.bufferErrorTolerance + 1).foreach { _ =>
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.SystemError)
+ invoker.stateName shouldBe Unhealthy
+ }
+
+ (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance -
1).foreach { _ =>
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.Success)
+ invoker.stateName shouldBe Unhealthy
+ }
+
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.Success)
+ invoker.stateName shouldBe Healthy
+ }
+
+ it should "revert to unhealthy during initial startup if there is a failed
test activation" in {
+ assume(InvokerActor.bufferErrorTolerance >= 3)
+
+ val invoker =
+ TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory =
defaultUserMemory), ControllerInstanceId("0")))
+ invoker.stateName shouldBe Unhealthy
+
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.SystemError)
+ invoker.stateName shouldBe Unhealthy
+
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.Success)
+ invoker.stateName shouldBe Healthy
+
+ invoker ! InvocationFinishedMessage(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ InvocationFinishedResult.SystemError)
+ invoker.stateName shouldBe Unhealthy
+ }
+
// unHealthy -> offline
// offline -> unhealthy
it should "start unhealthy, go offline if the state times out and goes
unhealthy on a successful ping again" in {
@@ -318,7 +366,7 @@ class InvokerSupervisionTests
}
}
- it should "start timer to send testactions when unhealthy" in {
+ it should "start timer to send test actions when unhealthy" in {
val invoker =
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory =
defaultUserMemory), ControllerInstanceId("0")))
invoker.stateName shouldBe Unhealthy
@@ -337,7 +385,6 @@ class InvokerSupervisionTests
}
it should "initially store invoker status with its full id -
instance/uniqueName/displayedName" in {
-
val invoker0 = TestProbe()
val children = mutable.Queue(invoker0.ref)
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) =>
children.dequeue()