This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 59b67fe  Implement InvokerHealthyManager (#5061)
59b67fe is described below

commit 59b67fe96f44e573f3348afed966a1cdaf80ddf2
Author: ningyougang <[email protected]>
AuthorDate: Tue Mar 23 01:09:17 2021 +0800

    Implement InvokerHealthyManager (#5061)
---
 .../org/apache/openwhisk/common/Message.scala      |  21 +
 .../apache/openwhisk/common/TransactionId.scala    |   2 +
 .../apache/openwhisk/core/connector/Message.scala  |  27 ++
 .../apache/openwhisk/core/entity/InstanceId.scala  |  26 +-
 core/invoker/build.gradle                          |   1 +
 .../containerpool/v2/ActivationClientProxy.scala   |  33 ++
 .../v2/FunctionPullingContainerProxy.scala         | 106 +++++
 .../containerpool/v2/InvokerHealthManager.scala    | 381 +++++++++++++++++
 tests/build.gradle                                 |   1 +
 .../v2/test/InvokerHealthManagerTests.scala        | 453 +++++++++++++++++++++
 .../core/entity/test/InvokerInstanceIdTests.scala  |  14 +-
 11 files changed, 1059 insertions(+), 6 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/Message.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/Message.scala
new file mode 100644
index 0000000..e425105
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Message.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common
+
+case object GracefulShutdown
+case object Enable
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
index e8e58b6..f7df2d0 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
@@ -236,6 +236,7 @@ object TransactionId {
   val unknown = TransactionId(systemPrefix + "unknown")
   val testing = TransactionId(systemPrefix + "testing") // Common id for for 
unit testing
   val invoker = TransactionId(systemPrefix + "invoker") // Invoker 
startup/shutdown or GC activity
+  val invokerHealthManager = TransactionId(systemPrefix + 
"invokerHealthManager") // Invoker startup/shutdown or GC activity
   val invokerWarmup = TransactionId(systemPrefix + "invokerWarmup") // Invoker 
warmup thread that makes stem-cell containers
   val invokerNanny = TransactionId(systemPrefix + "invokerNanny") // Invoker 
nanny thread
   val dispatcher = TransactionId(systemPrefix + "dispatcher") // Kafka message 
dispatcher
@@ -244,6 +245,7 @@ object TransactionId {
   val controller = TransactionId(systemPrefix + "controller") // Controller 
startup
   val dbBatcher = TransactionId(systemPrefix + "dbBatcher") // Database batcher
   val actionHealthPing = TransactionId(systemPrefix + "actionHealth")
+  def invokerHealthActivation = TransactionId(systemPrefix + 
"invokerHealthActivation") // Invoker health activation
 
   private val dict = ('A' to 'Z') ++ ('a' to 'z') ++ ('0' to '9')
 
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index 6052258..bcd56e0 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -428,6 +428,33 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],
+                                  dedicatedNamespaces: Seq[String])
+    extends Message {
+
+  /**
+   * Serializes message to string. Must be idempotent.
+   */
+  override def serialize: String = 
InvokerResourceMessage.serdes.write(this).compactPrint
+}
+
+object InvokerResourceMessage extends DefaultJsonProtocol {
+  def parse(msg: String): Try[InvokerResourceMessage] = 
Try(serdes.read(msg.parseJson))
+  implicit val serdes =
+    jsonFormat(
+      InvokerResourceMessage.apply _,
+      "status",
+      "freeMemory",
+      "busyMemory",
+      "inProgressMemory",
+      "tags",
+      "dedicatedNamespaces")
+}
+
 /**
  * This case class is used when retrieving the snapshot of the queue status 
from the scheduler at a certain moment.
  * This is useful to figure out the internal status when any issue happens.
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
index 0421e9b..34eadbd 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
@@ -29,11 +29,18 @@ import scala.util.Try
  * @param instance a numeric value used for the load balancing and Kafka topic 
creation
  * @param uniqueName an identifier required for dynamic instance assignment by 
Zookeeper
  * @param displayedName an identifier that is required for the health protocol 
to correlate Kafka topics with invoker container names
+ * @param userMemory invoker user memory
+ * @param busyMemory invoker busy memory
+ * @param tags actions which included specified annotation tags can be run on 
this invoker
+ * @param dedicatedNamespaces only dedicatedNamespaces's actions can be run on 
this invoker
  */
 case class InvokerInstanceId(val instance: Int,
                              uniqueName: Option[String] = None,
                              displayedName: Option[String] = None,
-                             val userMemory: ByteSize)
+                             val userMemory: ByteSize,
+                             val busyMemory: Option[ByteSize] = None,
+                             val tags: Seq[String] = Seq.empty[String],
+                             val dedicatedNamespaces: Seq[String] = Seq.empty)
     extends InstanceId {
   def toInt: Int = instance
 
@@ -76,7 +83,12 @@ object InvokerInstanceId extends DefaultJsonProtocol {
       val fields = new ListBuffer[(String, JsValue)]
       fields ++= List("instance" -> JsNumber(i.instance))
       fields ++= List("userMemory" -> JsString(i.userMemory.toString))
+      i.busyMemory.foreach { busyMemory =>
+        fields ++= List("busyMemory" -> JsString(busyMemory.toString))
+      }
       fields ++= List("instanceType" -> JsString(i.instanceType))
+      fields ++= List("tags" -> JsArray(i.tags.map(_.toJson): _*))
+      fields ++= List("dedicatedNamespaces" -> 
JsArray(i.dedicatedNamespaces.map(_.toJson): _*))
       i.uniqueName.foreach(uniqueName => fields ++= List("uniqueName" -> 
JsString(uniqueName)))
       i.displayedName.foreach(displayedName => fields ++= List("displayedName" 
-> JsString(displayedName)))
       JsObject(fields.toSeq: _*)
@@ -87,10 +99,20 @@ object InvokerInstanceId extends DefaultJsonProtocol {
       val uniqueName = fromField[Option[String]](json, "uniqueName")
       val displayedName = fromField[Option[String]](json, "displayedName")
       val userMemory = fromField[String](json, "userMemory")
+      val busyMemory = fromField[Option[String]](json, "busyMemory")
       val instanceType = fromField[String](json, "instanceType")
+      val tags = fromField[Seq[String]](json, "tags")
+      val dedicatedNamespaces = fromField[Seq[String]](json, 
"dedicatedNamespaces")
 
       if (instanceType == "invoker") {
-        new InvokerInstanceId(instance, uniqueName, displayedName, 
ByteSize.fromString(userMemory))
+        new InvokerInstanceId(
+          instance,
+          uniqueName,
+          displayedName,
+          ByteSize.fromString(userMemory),
+          busyMemory.map(ByteSize.fromString(_)),
+          tags,
+          dedicatedNamespaces)
       } else {
         deserializationError("could not read InvokerInstanceId")
       }
diff --git a/core/invoker/build.gradle b/core/invoker/build.gradle
index 1bf2984..06560f0 100644
--- a/core/invoker/build.gradle
+++ b/core/invoker/build.gradle
@@ -37,6 +37,7 @@ distDockerCoverage.dependsOn 
':common:scala:scoverageClasses', 'scoverageClasses
 dependencies {
     compile "org.scala-lang:scala-library:${gradle.scala.version}"
     compile project(':common:scala')
+    compile project(':core:scheduler')
 
     compile ("org.apache.curator:curator-recipes:${gradle.curator.version}") {
         exclude group: 'org.apache.zookeeper', module:'zookeeper'
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
new file mode 100644
index 0000000..9da672b
--- /dev/null
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
+
+// Event send by the actor
+case class ClientCreationCompleted(client: Option[ActorRef] = None)
+case object ClientClosed
+case object CloseClientProxy
+
+// Event received by the actor
+case object StartClient
+case class RequestActivation(lastDuration: Option[Long] = None, newScheduler: 
Option[SchedulerEndpoints] = None)
+
+// TODO, use grpc to fetch activation from memoryQueue
+class ActivationClientProxy {}
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
new file mode 100644
index 0000000..2a33a0d
--- /dev/null
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.time.Instant
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.Container
+import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, 
ExecutableWhiskAction}
+import org.apache.openwhisk.core.entity.size._
+
+// Events received by the actor
+case class Initialize(invocationNamespace: String,
+                      action: ExecutableWhiskAction,
+                      schedulerHost: String,
+                      rpcPort: Int,
+                      transId: TransactionId)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+
+// Event sent by the actor
+case class ContainerCreationFailed(throwable: Throwable)
+case class ContainerIsPaused(data: WarmData)
+case class ClientCreationFailed(throwable: Throwable,
+                                container: Container,
+                                invocationNamespace: String,
+                                action: ExecutableWhiskAction)
+case class ReadyToWork(data: Data)
+case class Initialized(data: InitializedData)
+case class Resumed(data: WarmData)
+case class ResumeFailed(data: WarmData)
+case class RecreateClient(action: ExecutableWhiskAction)
+case object ContainerRemoved // when container is destroyed
+
+// States
+sealed trait ProxyState
+case object LeaseStart extends ProxyState
+case object Uninitialized extends ProxyState
+case object CreatingContainer extends ProxyState
+case object ContainerCreated extends ProxyState
+case object CreatingClient extends ProxyState
+case object ClientCreated extends ProxyState
+case object Running extends ProxyState
+case object Pausing extends ProxyState
+case object Paused extends ProxyState
+case object Removing extends ProxyState
+case object Rescheduling extends ProxyState
+
+// Data
+sealed abstract class Data(val memoryLimit: ByteSize) {
+  def getContainer: Option[Container]
+}
+case class NonexistentData() extends Data(0.B) {
+  override def getContainer = None
+}
+case class MemoryData(override val memoryLimit: ByteSize) extends 
Data(memoryLimit) {
+  override def getContainer = None
+}
+trait WithClient { val clientProxy: ActorRef }
+case class PreWarmData(container: Container, kind: String, override val 
memoryLimit: ByteSize)
+    extends Data(memoryLimit) {
+  override def getContainer = Some(container)
+}
+
+case class ContainerCreatedData(container: Container, invocationNamespace: 
String, action: ExecutableWhiskAction)
+    extends Data(action.limits.memory.megabytes.MB) {
+  override def getContainer = Some(container)
+}
+
+case class InitializedData(container: Container,
+                           invocationNamespace: String,
+                           action: ExecutableWhiskAction,
+                           override val clientProxy: ActorRef)
+    extends Data(action.limits.memory.megabytes.MB)
+    with WithClient {
+  override def getContainer = Some(container)
+}
+
+case class WarmData(container: Container,
+                    invocationNamespace: String,
+                    action: ExecutableWhiskAction,
+                    revision: DocRevision,
+                    lastUsed: Instant,
+                    override val clientProxy: ActorRef)
+    extends Data(action.limits.memory.megabytes.MB)
+    with WithClient {
+  override def getContainer = Some(container)
+}
+
+// TODO
+class FunctionPullingContainerProxy {}
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
new file mode 100644
index 0000000..8372fc1
--- /dev/null
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, 
Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.EtcdKV.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, 
ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit 
actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the 
invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+      goto(Unhealthy)
+  }
+
+  when(Unhealthy) {
+    case Event(ContainerRemoved, _) =>
+      healthActionProxy = None
+      startTestAction(self)
+      stay
+
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a 
failure message: ${msg}")
+      stay
+
+    case Event(ContainerCreationFailed(_), _) =>
+      stay
+  }
+
+  when(Healthy) {
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a 
failure message: ${msg}")
+      goto(Unhealthy)
+  }
+
+  whenUnhandled {
+    case Event(_: Initialized, _) =>
+      // Initialized messages sent by ContainerProxy for HealthManger
+      stay()
+
+    case Event(ContainerRemoved, _) =>
+      // Drop messages sent by ContainerProxy for HealthManger
+      healthActionProxy = None
+      stay()
+
+    case Event(GracefulShutdown, _) =>
+      self ! GracefulShutdown
+      goto(Offline)
+
+    case Event(healthMsg: HealthMessage, data: InvokerInfo) =>
+      if (stateName != Offline) {
+        handleHealthMessage(healthMsg.state, data.buffer)
+      } else {
+        stay
+      }
+
+    case Event(memoryInfo: MemoryInfo, data: InvokerInfo) =>
+      publishHealthStatusAndStay(stateName, data.copy(memory = memoryInfo))
+
+    // in case of StatusRuntimeException: NOT_FOUND: etcdserver: requested 
lease not found, we need to get the lease again.
+    case Event(t: FailureMessage, _) =>
+      logging.error(this, s"Failure happens, restart InvokerHealthManager: 
${t}")
+      goto(Offline)
+  }
+
+  // It is important to note that stateName and the stateData in onTransition 
callback refer to the previous one.
+  // We should access to the next data with nextStateData
+  onTransition {
+    case Offline -> Unhealthy =>
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case Healthy -> Unhealthy =>
+      unstashAll()
+      transid.mark(
+        this,
+        LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(Unhealthy.asString),
+        s"invoker${instanceId.toInt} is unhealthy",
+        akka.event.Logging.WarningLevel)
+      startTestAction(self)
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case _ -> Healthy =>
+      logging.info(this, s"invoker became healthy, stop health action proxy.")
+      unstashAll()
+      stopTestAction()
+
+      publishHealthStatusAndStay(Healthy, nextStateData)
+
+    case oldState -> newState if oldState != newState =>
+      publishHealthStatusAndStay(newState, nextStateData)
+      unstashAll()
+  }
+
+  private def publishHealthStatusAndStay(state: InvokerState, stateData: 
InvokerHealthData) = {
+    stateData match {
+      case data: InvokerInfo =>
+        val invokerResourceMessage = InvokerResourceMessage(
+          state.asString,
+          data.memory.freeMemory,
+          data.memory.busyMemory,
+          data.memory.inProgressMemory,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces)
+        dataManagementService ! 
UpdateDataOnChange(InvokerKeys.health(instanceId), 
invokerResourceMessage.serialize)
+        stay using data.copy(currentInvokerResource = 
Some(invokerResourceMessage))
+
+      case data =>
+        logging.error(this, s"unexpected data is found: $data")
+        stay
+    }
+  }
+
+  initialize()
+
+  private def startTestAction(manager: ActorRef): Unit = {
+    val namespace = 
InvokerHealthManager.healthActionIdentity.namespace.name.asString
+    val docId = InvokerHealthManager.healthAction(instanceId).get.docid
+
+    WhiskAction.get(entityStore, docId).onComplete {
+      case Success(action) =>
+        val initialize = Initialize(namespace, 
action.toExecutableWhiskAction.get, "", 0, transid)
+        startHealthAction(initialize, manager)
+      case Failure(t) => logging.error(this, s"get health action error: 
${t.getMessage}")
+    }
+  }
+
+  private def startHealthAction(initialize: Initialize, manager: ActorRef): 
Unit = {
+    healthActionProxy match {
+      case Some(proxy) =>
+        // make healthContainerProxy's status is Running, then 
healthContainerProxy can fetch the activation using ActivationServiceClient
+        proxy ! initialize
+      case None =>
+        val proxy = healthContainerProxyFactory(context, manager)
+        proxy ! initialize
+        healthActionProxy = Some(proxy)
+    }
+  }
+
+  def stopTestAction(): Unit = {
+    healthActionProxy.foreach {
+      healthActionProxy = None
+      _ ! GracefulShutdown
+    }
+  }
+
+  /**
+   * This method is to handle health message from ContainerProxy.pub
+   * It can induce status change.
+   *
+   * @param state  activation result state
+   * @param buffer RingBuffer to track status
+   * @return
+   */
+  def handleHealthMessage(state: Boolean, buffer: RingBuffer[Boolean]): State 
= {
+    buffer.add(state)
+    val falseStateCount = buffer.toList.count(_ == false)
+    if (falseStateCount < InvokerHealthManager.bufferErrorTolerance) {
+      gotoIfNotThere(Healthy)
+    } else {
+      logging.warn(
+        this,
+        s"become unhealthy because system error exceeded the error tolerance, 
falseStateCount $falseStateCount, errorTolerance 
${InvokerHealthManager.bufferErrorTolerance}")
+      gotoIfNotThere(Unhealthy)
+    }
+  }
+
+  /**
+   * This is to decide wether to change from the newState or not.
+   * If current state is already newState, it will stay, otherwise it will 
change its state.
+   *
+   * @param newState the desired state to change.
+   * @return
+   */
+  private def gotoIfNotThere(newState: InvokerState) = {
+    if (stateName == newState) {
+      stay()
+    } else {
+      goto(newState)
+    }
+  }
+
+  /** Delays all incoming messages until unstashAll() is called */
+  def delay = {
+    stash()
+    stay
+  }
+
+}
+
+case class HealthActivationServiceClient() extends Actor {
+
+  private var closed: Boolean = false
+
+  override def receive: Receive = {
+    case StartClient => sender() ! ClientCreationCompleted()
+    case _: RequestActivation =>
+      InvokerHealthManager.healthActivation match {
+        case Some(activation) if !closed =>
+          sender() ! activation.copy(
+            transid = TransactionId.invokerHealthActivation,
+            activationId = ActivationId.generate())
+
+        case _ if closed =>
+          context.parent ! ClientClosed
+          context.stop(self)
+
+        case _ => // do nothing
+      }
+
+    case CloseClientProxy =>
+      closed = true
+
+  }
+}
+
+object InvokerHealthManager {
+  val healthActionNamePrefix = "invokerHealthTestAction"
+  val bufferSize = 10
+  val bufferErrorTolerance = 3
+  val healthActionIdentity: Identity = {
+    val whiskSystem = "whisk.system"
+    val uuid = UUID()
+    Identity(
+      Subject(whiskSystem),
+      Namespace(EntityName(whiskSystem), uuid),
+      BasicAuthenticationAuthKey(uuid, Secret()),
+      Set[Privilege]())
+  }
+
+  def healthAction(i: InvokerInstanceId): Option[WhiskAction] =
+    ExecManifest.runtimesManifest.resolveDefaultRuntime("nodejs:default").map 
{ manifest =>
+      new WhiskAction(
+        namespace = 
InvokerHealthManager.healthActionIdentity.namespace.name.toPath,
+        name = EntityName(s"$healthActionNamePrefix${i.toInt}"),
+        exec = CodeExecAsString(manifest, """function main(params) { return 
params; }""", None),
+        limits = ActionLimits(memory = MemoryLimit(MemoryLimit.MIN_MEMORY), 
logs = LogLimit(0.B)))
+    }
+
+  var healthActivation: Option[ActivationMessage] = None
+
+  private def createTestActionForInvokerHealth(db: EntityStore, action: 
WhiskAction): Future[DocInfo] = {
+    implicit val tid: TransactionId = TransactionId.invokerHealthManager
+    implicit val ec: ExecutionContext = db.executionContext
+    implicit val logging: Logging = db.logging
+
+    WhiskAction
+      .get(db, action.docid)
+      .flatMap { oldAction =>
+        WhiskAction.put(db, action.revision(oldAction.rev), 
Some(oldAction))(tid, notifier = None)
+      }
+      .recoverWith {
+        case _: NoDocumentException => WhiskAction.put(db, action, old = 
None)(tid, notifier = None)
+      }
+      .andThen {
+        case Success(_) => logging.info(this, "test action for invoker health 
now exists")
+        case Failure(e) => logging.error(this, s"error creating test action 
for invoker health: $e")
+      }
+  }
+
+  private def createHealthActivation(entityStore: ArtifactStore[WhiskEntity],
+                                     docInfo: DocInfo)(implicit ec: 
ExecutionContext, logging: Logging) = {
+    implicit val transId = TransactionId.invokerHealth
+
+    WhiskAction.get(entityStore, docInfo.id).onComplete {
+      case Success(action) =>
+        healthActivation = Some(
+          ActivationMessage(
+            TransactionId.invokerHealth,
+            action.toExecutableWhiskAction.get.fullyQualifiedName(true),
+            action.rev,
+            healthActionIdentity,
+            ActivationId.generate(),
+            ControllerInstanceId("health"),
+            blocking = false,
+            content = None))
+      case Failure(t) => logging.error(this, s"get health action error: 
${t.getMessage}")
+    }
+  }
+
+  def prepare(entityStore: ArtifactStore[WhiskEntity],
+              invokerInstanceId: InvokerInstanceId)(implicit ec: 
ExecutionContext, logging: Logging): Future[Unit] = {
+    InvokerHealthManager.healthAction(invokerInstanceId) match {
+      case Some(action) =>
+        createTestActionForInvokerHealth(entityStore, action)
+          .map(docId => createHealthActivation(entityStore, docId))
+      case None =>
+        throw new IllegalStateException(
+          "cannot create test action for invoker health because runtime 
manifest is not valid")
+    }
+  }
+
+  def props(instanceId: InvokerInstanceId,
+            childFactory: (ActorRefFactory, ActorRef) => ActorRef,
+            dataManagementService: ActorRef,
+            entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem: 
ActorSystem, logging: Logging): Props = {
+    Props(new InvokerHealthManager(instanceId, childFactory, 
dataManagementService, entityStore))
+  }
+}
+
+// States an Invoker can be in
+sealed trait InvokerState {
+  val asString: String
+}
+
+case object Offline extends InvokerState {
+  val asString = "down"
+}
+
+case object Healthy extends InvokerState {
+  val asString = "up"
+}
+
+case object Unhealthy extends InvokerState {
+  val asString = "unhealthy"
+}
+
+//recevied from ContainerProxy actor
+case class HealthMessage(state: Boolean)
+
+//rereived from ContainerPool actor
+case class MemoryInfo(freeMemory: Long, busyMemory: Long, inProgressMemory: 
Long)
+
+// Data stored in the Invoker
+sealed class InvokerHealthData
+
+case class InvokerInfo(buffer: RingBuffer[Boolean],
+                       memory: MemoryInfo = MemoryInfo(0, 0, 0),
+                       currentInvokerResource: Option[InvokerResourceMessage] 
= None)
+    extends InvokerHealthData
diff --git a/tests/build.gradle b/tests/build.gradle
index 2d82234..60c7b3f 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -89,6 +89,7 @@ ext.testSets = [
     "REQUIRE_SCHEDULER" : [
         "includes" : [
             "org/apache/openwhisk/common/etcd/**",
+            "org/apache/openwhisk/core/containerpool/v2/test/**",
             "org/apache/openwhisk/core/scheduler/**",
             "org/apache/openwhisk/core/service/**",
         ]
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala
new file mode 100644
index 0000000..8ab5946
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2.test
+
+import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem}
+import akka.stream.ActorMaterializer
+import akka.testkit.{ImplicitSender, TestActor, TestFSMRef, TestKit, TestProbe}
+import common.StreamLogging
+import org.apache.openwhisk.common.{Enable, GracefulShutdown, RingBuffer}
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.connector.InvokerResourceMessage
+import org.apache.openwhisk.core.containerpool.v2._
+import org.apache.openwhisk.core.database.test.DbUtils
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.{ExecManifest, InvokerInstanceId, 
WhiskEntityStore}
+import org.apache.openwhisk.core.etcd.EtcdKV.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, 
Matchers}
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+@RunWith(classOf[JUnitRunner])
+class InvokerHealthManagerTests
+    extends TestKit(ActorSystem("InvokerHealthManager"))
+    with ImplicitSender
+    with FlatSpecLike
+    with Matchers
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with MockFactory
+    with ScalaFutures
+    with StreamLogging
+    with DbUtils {
+
+  override def afterAll = TestKit.shutdownActorSystem(system)
+
+  implicit val mt = ActorMaterializer()
+  implicit val ec = system.dispatcher
+
+  val config = new WhiskConfig(ExecManifest.requiredProperties)
+
+  ExecManifest.initialize(config) should be a 'success
+
+  val timeout = 10.seconds
+
+  val instanceId = InvokerInstanceId(0, userMemory = 1024.MB, tags = 
Seq("gpu-enabled", "high-memory"))
+
+  val entityStore = WhiskEntityStore.datastore()
+
+  val freeMemory: Long = 512
+
+  val busyMemory: Long = 256
+
+  val inProgressMemory: Long = 256
+
+  /** Creates a sequence of containers and a factory returning this sequence. 
*/
+  def testContainers(n: Int) = {
+    val containers = (0 to n).map(_ => TestProbe())
+    val queue = mutable.Queue(containers: _*)
+    val factory = (fac: ActorRefFactory, manager: ActorRef) => 
queue.dequeue().ref
+    (containers, factory)
+  }
+
+  behavior of "InvokerHealthManager"
+
+  it should "invoke health action and become healthy state" in within(timeout) 
{
+    val (_, factory) = testContainers(InvokerHealthManager.bufferSize)
+    val dataManagementService = TestProbe()
+    val probe = TestProbe()
+    val fsm = TestFSMRef(new InvokerHealthManager(instanceId, factory, 
dataManagementService.ref, entityStore))
+    fsm ! SubscribeTransitionCallBack(probe.ref)
+    probe.expectMsg(CurrentState(fsm, Offline))
+
+    fsm ! Enable
+    (1 to InvokerHealthManager.bufferSize - 
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+      fsm ! HealthMessage(true)
+    }
+
+    probe.expectMsg(Transition(fsm, Offline, Unhealthy))
+    probe.expectMsg(Transition(fsm, Unhealthy, Healthy))
+
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Unhealthy.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Healthy.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+  }
+
+  it should "invoke health action again when it becomes unhealthy" in 
within(timeout) {
+    val (_, factory) = testContainers(InvokerHealthManager.bufferSize)
+    val dataManagementService = TestProbe()
+    val fsm = TestFSMRef(new InvokerHealthManager(instanceId, factory, 
dataManagementService.ref, entityStore))
+    val probe = TestProbe()
+    var buffer = new RingBuffer[Boolean](InvokerHealthManager.bufferSize)
+    (1 to InvokerHealthManager.bufferSize - 
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+      buffer.add(true)
+    }
+    fsm.setState(Healthy, InvokerInfo(buffer, memory = 
MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+    fsm ! SubscribeTransitionCallBack(probe.ref)
+    probe.expectMsg(CurrentState(fsm, Healthy))
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Healthy.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+
+    (1 to InvokerHealthManager.bufferErrorTolerance + 1) foreach { _ =>
+      fsm ! HealthMessage(false)
+    }
+
+    probe.expectMsg(Transition(fsm, Healthy, Unhealthy))
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Unhealthy.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+  }
+
+  it should "publish healthy status and pool info to etcd together" in 
within(timeout) {
+    val (_, factory) = testContainers(InvokerHealthManager.bufferSize)
+    val dataManagementService = TestProbe()
+    val fsm = TestFSMRef(new InvokerHealthManager(instanceId, factory, 
dataManagementService.ref, entityStore))
+    val probe = TestProbe()
+    fsm ! SubscribeTransitionCallBack(probe.ref)
+    probe.expectMsg(CurrentState(fsm, Offline))
+
+    fsm ! Enable
+    (1 to InvokerHealthManager.bufferSize - 
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+      fsm ! HealthMessage(true)
+    }
+
+    probe.expectMsg(Transition(fsm, Offline, Unhealthy))
+    probe.expectMsg(Transition(fsm, Unhealthy, Healthy))
+
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Unhealthy.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Healthy.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+
+    fsm ! MemoryInfo(freeMemory, busyMemory, inProgressMemory)
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Healthy.asString,
+          freeMemory,
+          busyMemory,
+          inProgressMemory,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+  }
+
+  it should "change the invoker pool info to etcd when memory info has 
changes" in within(timeout) {
+    val (_, factory) = testContainers(InvokerHealthManager.bufferSize)
+    val dataManagementService = TestProbe()
+    val fsm = TestFSMRef(new InvokerHealthManager(instanceId, factory, 
dataManagementService.ref, entityStore))
+    val probe = TestProbe()
+    fsm ! SubscribeTransitionCallBack(probe.ref)
+    probe.expectMsg(CurrentState(fsm, Offline))
+
+    fsm ! Enable
+    (1 to InvokerHealthManager.bufferSize - 
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+      fsm ! HealthMessage(true)
+    }
+
+    probe.expectMsg(Transition(fsm, Offline, Unhealthy))
+    probe.expectMsg(Transition(fsm, Unhealthy, Healthy))
+
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Unhealthy.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Healthy.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+
+    fsm ! MemoryInfo(freeMemory, busyMemory, inProgressMemory)
+
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Healthy.asString,
+          freeMemory,
+          busyMemory,
+          inProgressMemory,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+
+    val changedFreeMemory = freeMemory - 256
+    val changedBusyMemory = busyMemory + 256
+    fsm ! MemoryInfo(changedFreeMemory, changedBusyMemory, inProgressMemory)
+
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Healthy.asString,
+          changedFreeMemory,
+          changedBusyMemory,
+          inProgressMemory,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+  }
+
+  it should "disable and enable the invoker gracefully" in within(timeout) {
+    val (_, factory) = testContainers(InvokerHealthManager.bufferSize)
+    val dataManagementService = TestProbe()
+    val fsm = TestFSMRef(new InvokerHealthManager(instanceId, factory, 
dataManagementService.ref, entityStore))
+
+    val probe = TestProbe()
+    fsm ! SubscribeTransitionCallBack(probe.ref)
+    probe.expectMsg(CurrentState(fsm, Offline))
+
+    fsm ! Enable
+    (1 to InvokerHealthManager.bufferSize - 
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+      fsm ! HealthMessage(true)
+    }
+
+    probe.expectMsg(Transition(fsm, Offline, Unhealthy))
+    probe.expectMsg(Transition(fsm, Unhealthy, Healthy))
+
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Unhealthy.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Healthy.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+
+    fsm ! GracefulShutdown
+
+    probe.expectMsg(Transition(fsm, Healthy, Offline))
+
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Offline.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+
+    val mockHealthActionProxy = TestProbe()
+    fsm.underlyingActor.healthActionProxy = Some(mockHealthActionProxy.ref)
+
+    fsm ! Enable
+
+    probe.expectMsg(Transition(fsm, Offline, Unhealthy))
+
+    mockHealthActionProxy.setAutoPilot((sender, msg) =>
+      msg match {
+        case _: Initialize =>
+          (1 to InvokerHealthManager.bufferSize - 
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+            sender ! HealthMessage(true)
+          }
+
+          TestActor.KeepRunning
+
+        case GracefulShutdown =>
+          TestActor.KeepRunning
+
+    })
+    probe.expectMsg(5.seconds, Transition(fsm, Unhealthy, Healthy))
+
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Unhealthy.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Healthy.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+  }
+
+  it should "keep status Offline all the time in spite of receive 
healthMessage" in within(timeout) {
+    val (_, factory) = testContainers(InvokerHealthManager.bufferSize)
+    val dataManagementService = TestProbe()
+    val fsm = TestFSMRef(new InvokerHealthManager(instanceId, factory, 
dataManagementService.ref, entityStore))
+
+    val probe = TestProbe()
+    fsm ! SubscribeTransitionCallBack(probe.ref)
+    probe.expectMsg(CurrentState(fsm, Offline))
+
+    fsm ! Enable
+    (1 to InvokerHealthManager.bufferSize - 
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+      fsm ! HealthMessage(true)
+    }
+
+    probe.expectMsg(Transition(fsm, Offline, Unhealthy))
+    probe.expectMsg(Transition(fsm, Unhealthy, Healthy))
+
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Unhealthy.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Healthy.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+
+    fsm ! GracefulShutdown
+
+    probe.expectMsg(Transition(fsm, Healthy, Offline))
+
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Offline.asString,
+          instanceId.userMemory.toMB,
+          0,
+          0,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+
+    (1 to InvokerHealthManager.bufferSize - 
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+      fsm ! HealthMessage(true)
+    }
+
+    // Keep the status Offline all the time unless enable it first
+    probe.expectNoMessage()
+
+    val changedFreeMemory = freeMemory - 256
+    val changedBusyMemory = busyMemory + 256
+    fsm ! MemoryInfo(changedFreeMemory, changedBusyMemory, inProgressMemory)
+
+    // In spite of the status is Offline, the Memory info may be changed 
during zerodowntime deployment for invoker.
+    dataManagementService.expectMsg(
+      UpdateDataOnChange(
+        InvokerKeys.health(instanceId),
+        InvokerResourceMessage(
+          Offline.asString,
+          changedFreeMemory,
+          changedBusyMemory,
+          inProgressMemory,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces).serialize))
+  }
+}
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/InvokerInstanceIdTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/InvokerInstanceIdTests.scala
index b0a1016..fb3a35c 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/InvokerInstanceIdTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/InvokerInstanceIdTests.scala
@@ -22,7 +22,7 @@ import org.apache.openwhisk.core.entity.{ByteSize, 
InstanceId, InvokerInstanceId
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{FlatSpec, Matchers}
-import spray.json.{JsNumber, JsObject, JsString}
+import spray.json.{JsArray, JsNumber, JsObject, JsString}
 
 import scala.util.Success
 
@@ -37,7 +37,9 @@ class InvokerInstanceIdTests extends FlatSpec with Matchers {
     i.serialize shouldBe JsObject(
       "instance" -> JsNumber(i.instance),
       "userMemory" -> JsString(i.userMemory.toString),
-      "instanceType" -> JsString(i.instanceType)).compactPrint
+      "instanceType" -> JsString(i.instanceType),
+      "tags" -> JsArray.empty,
+      "dedicatedNamespaces" -> JsArray.empty).compactPrint
     i.serialize shouldBe i.toJson.compactPrint
     InstanceId.parse(i.serialize) shouldBe Success(i)
   }
@@ -48,7 +50,9 @@ class InvokerInstanceIdTests extends FlatSpec with Matchers {
       "instance" -> JsNumber(i1.instance),
       "userMemory" -> JsString(i1.userMemory.toString),
       "instanceType" -> JsString(i1.instanceType),
-      "uniqueName" -> JsString(i1.uniqueName.getOrElse(""))).compactPrint
+      "uniqueName" -> JsString(i1.uniqueName.getOrElse("")),
+      "tags" -> JsArray.empty,
+      "dedicatedNamespaces" -> JsArray.empty).compactPrint
     i1.serialize shouldBe i1.toJson.compactPrint
     InstanceId.parse(i1.serialize) shouldBe Success(i1)
 
@@ -62,7 +66,9 @@ class InvokerInstanceIdTests extends FlatSpec with Matchers {
       "userMemory" -> JsString(i2.userMemory.toString),
       "instanceType" -> JsString(i2.instanceType),
       "uniqueName" -> JsString(i2.uniqueName.getOrElse("")),
-      "displayedName" -> JsString(i2.displayedName.getOrElse(""))).compactPrint
+      "displayedName" -> JsString(i2.displayedName.getOrElse("")),
+      "tags" -> JsArray.empty,
+      "dedicatedNamespaces" -> JsArray.empty).compactPrint
     i2.serialize shouldBe i2.toJson.compactPrint
     InstanceId.parse(i2.serialize) shouldBe Success(i2)
   }

Reply via email to