bdoyle0182 commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r571218281



##########
File path: 
core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.time.Instant
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.Container
+import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, 
ExecutableWhiskAction}
+import org.apache.openwhisk.core.entity.size._
+
+// Events received by the actor
+case class Initialize(invocationNamespace: String,
+                      action: ExecutableWhiskAction,
+                      schedulerHost: String,
+                      rpcPort: Int,
+                      transId: TransactionId)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+
+// Event sent by the actor
+case class ContainerCreationFailed(throwable: Throwable)
+case class ContainerIsPaused(data: WarmData)
+case class ClientCreationFailed(throwable: Throwable,
+                                container: Container,
+                                invocationNamespace: String,
+                                action: ExecutableWhiskAction)
+case class ReadyToWork(data: Data)
+case class Initialized(data: InitializedData)
+case class Resumed(data: WarmData)
+case class ResumeFailed(data: WarmData)
+case class RecreateClient(action: ExecutableWhiskAction)
+case object ContainerRemoved // when container is destroyed
+
+// States
+sealed trait ProxyState
+case object LeaseStart extends ProxyState
+case object Uninitialized extends ProxyState
+case object ContainerCreating extends ProxyState

Review comment:
       ```suggestion
   case object CreatingContainer extends ProxyState
   ```

##########
File path: 
core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,379 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, 
Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, 
ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit 
actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the 
invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+
+      goto(UnHealthy)
+  }
+
+  when(UnHealthy) {
+    case Event(ContainerRemoved, _) =>
+      healthActionProxy = None
+      startTestAction(self)
+
+      stay
+
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a 
failure message: ${msg}")
+
+      stay
+
+    case Event(ContainerCreationFailed(_), _) =>
+      stay
+  }
+
+  when(Healthy) {
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a 
failure message: ${msg}")
+      goto(UnHealthy)
+  }
+
+  whenUnhandled {
+    case Event(_: Initialized, _) =>
+      // Initialized messages sent by ContainerProxy for HealthManger
+      stay()
+    case Event(ContainerRemoved, _) =>
+      // Drop messages sent by ContainerProxy for HealthManger
+      healthActionProxy = None
+      stay()
+
+    case Event(GracefulShutdown, _) =>
+      self ! GracefulShutdown
+      goto(Offline)
+
+    case Event(healthMsg: HealthMessage, data: InvokerInfo) =>
+      if (stateName != Offline) {
+        handleHealthMessage(healthMsg.state, data.buffer)
+      } else {
+        stay
+      }
+
+    case Event(memoryInfo: MemoryInfo, data: InvokerInfo) =>
+      publishHealthStatusAndStay(stateName, data.copy(memory = memoryInfo))
+
+    // in case of StatusRuntimeException: NOT_FOUND: etcdserver: requested 
lease not found, we need to get the lease again.
+    case Event(t: FailureMessage, _) =>
+      logging.error(this, s"Failure happens, restart InvokerHealthManager: 
${t}")
+
+      goto(Offline)
+
+  }
+
+  // It is important to note that stateName and the stateData in onTransition 
callback refer to the previous one.
+  // We should access to the next data with nextStateData
+  onTransition {
+    case Offline -> UnHealthy =>
+      publishHealthStatusAndStay(UnHealthy, nextStateData)
+
+    case Healthy -> UnHealthy =>
+      unstashAll()
+      transid.mark(
+        this,
+        LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(UnHealthy.asString),
+        s"invoker${instanceId.toInt} is unhealthy",
+        akka.event.Logging.WarningLevel)
+      startTestAction(self)
+      publishHealthStatusAndStay(UnHealthy, nextStateData)
+
+    case _ -> Healthy =>
+      logging.info(this, s"invoker became healthy, stop health action proxy.")
+      unstashAll()
+      stopTestAction()
+
+      publishHealthStatusAndStay(Healthy, nextStateData)
+
+    case Offline -> Offline =>
+    // this is an initial transition due to startWith, do nothing
+
+    case _ -> newState =>
+      publishHealthStatusAndStay(newState, nextStateData)
+
+      unstashAll()
+
+  }
+
+  private def publishHealthStatusAndStay(state: InvokerState, stateData: 
InvokerHealthData) = {
+    stateData match {
+      case data: InvokerInfo =>
+        val invokerResourceMessage = InvokerResourceMessage(
+          state.asString,
+          data.memory.freeMemory,
+          data.memory.busyMemory,
+          data.memory.inProgressMemory,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces)
+        InvokerHealthManager.useMemory = invokerResourceMessage.busyMemory + 
invokerResourceMessage.inProgressMemory
+        dataManagementService ! 
UpdateDataOnChange(InvokerKeys.health(instanceId), 
invokerResourceMessage.serialize)
+
+        stay using data.copy(currentInvokerResource = 
Some(invokerResourceMessage))
+
+      case data =>
+        logging.error(this, s"unexpected data is found: $data")
+
+        stay
+    }
+  }
+
+  initialize()
+
+  private def startTestAction(manager: ActorRef): Unit = {
+    val namespace = 
InvokerHealthManager.healthActionIdentity.namespace.name.asString
+    val docId = InvokerHealthManager.healthAction(instanceId).get.docid
+
+    WhiskAction.get(entityStore, docId).onComplete {
+      case Success(action) =>
+        val initialize = Initialize(namespace, 
action.toExecutableWhiskAction.get, "", 0, transid)
+        startHealthAction(initialize, manager)
+      case Failure(t) => logging.error(this, s"get health action error: 
${t.getMessage}")
+    }
+  }
+
+  private def startHealthAction(initialize: Initialize, manager: ActorRef): 
Unit = {
+    healthActionProxy match {
+      case Some(proxy) =>
+        // make healthContainerProxy's status is Running, then 
healthContainerProxy can fetch the activation using ActivationServiceClient
+        proxy ! initialize
+      case None =>
+        val proxy = healthContainerProxyFactory(context, manager)
+        proxy ! initialize
+        healthActionProxy = Some(proxy)
+    }
+  }
+
+  def stopTestAction(): Unit = {
+    healthActionProxy.foreach {
+      healthActionProxy = None
+      _ ! GracefulShutdown
+    }
+  }
+
+  /**
+   * This method is to handle health message from ContainerProxy.pub
+   * It can induce status change.
+   *
+   * @param state  activation result state
+   * @param buffer RingBuffer to track status
+   * @return
+   */
+  def handleHealthMessage(state: Boolean, buffer: RingBuffer[Boolean]): State 
= {
+    buffer.add(state)
+    val falseStateCount = buffer.toList.count(_ == false)
+    if (falseStateCount < InvokerHealthManager.bufferErrorTolerance) {
+      gotoIfNotThere(Healthy)
+    } else {
+      logging.warn(
+        this,
+        s"become unhealthy because system error exceeded the error tolerance, 
falseStateCount $falseStateCount, errorTolerance 
${InvokerHealthManager.bufferErrorTolerance}")
+      gotoIfNotThere(UnHealthy)
+    }
+  }
+
+  /**
+   * This is to decide wether to change from the newState or not.
+   * If current state is already newState, it will stay, otherwise it will 
change its state.
+   *
+   * @param newState the desired state to change.
+   * @return
+   */
+  private def gotoIfNotThere(newState: InvokerState) = {
+    if (stateName == newState) {
+      stay()
+    } else {
+      goto(newState)
+    }
+  }
+
+  /** Delays all incoming messages until unstashAll() is called */
+  def delay = {
+    stash()
+    stay
+  }
+
+}
+
+case class HealthActivationServiceClient() extends Actor {
+
+  private var closed: Boolean = false
+
+  override def receive: Receive = {
+    case StartClient => sender() ! ClientCreationCompleted()
+    case _: RequestActivation =>
+      InvokerHealthManager.healthActivation match {
+        case Some(activation) if !closed =>
+          sender() ! activation.copy(
+            transid = TransactionId.invokerHealthActivation,
+            activationId = ActivationId.generate())
+
+        case _ if closed =>
+          context.parent ! ClientClosed
+          context.stop(self)
+
+        case _ => // do nothing
+      }
+
+    case CloseClientProxy =>
+      closed = true
+
+  }
+}
+
+object InvokerHealthManager {
+  val healthActionNamePrefix = "invokerHealthTestAction"
+  val bufferSize = 10
+  val bufferErrorTolerance = 3
+
+  var useMemory = 0l

Review comment:
       Shouldn't use vars in singleton object if possible

##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       This is a VERY powerful feature to group invokers depending on the 
operator's needs without creating multiple clusters. I'm glad it's being 
introduced with the new scheduler.

##########
File path: 
core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, 
Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, 
ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit 
actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {

Review comment:
       nit: There's a lot of unnecessary blank lines from here on down we can 
clean up

##########
File path: 
core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.time.Instant
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.Container
+import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, 
ExecutableWhiskAction}
+import org.apache.openwhisk.core.entity.size._
+
+// Events received by the actor
+case class Initialize(invocationNamespace: String,
+                      action: ExecutableWhiskAction,
+                      schedulerHost: String,
+                      rpcPort: Int,
+                      transId: TransactionId)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+
+// Event sent by the actor
+case class ContainerCreationFailed(throwable: Throwable)
+case class ContainerIsPaused(data: WarmData)
+case class ClientCreationFailed(throwable: Throwable,
+                                container: Container,
+                                invocationNamespace: String,
+                                action: ExecutableWhiskAction)
+case class ReadyToWork(data: Data)
+case class Initialized(data: InitializedData)
+case class Resumed(data: WarmData)
+case class ResumeFailed(data: WarmData)
+case class RecreateClient(action: ExecutableWhiskAction)
+case object ContainerRemoved // when container is destroyed
+
+// States
+sealed trait ProxyState
+case object LeaseStart extends ProxyState
+case object Uninitialized extends ProxyState
+case object ContainerCreating extends ProxyState
+case object ContainerCreated extends ProxyState
+case object ClientCreating extends ProxyState

Review comment:
       ```suggestion
   case object CreatingClient extends ProxyState
   ```

##########
File path: 
core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, 
Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, 
ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit 
actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the 
invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+
+      goto(Unhealthy)
+  }
+
+  when(Unhealthy) {
+    case Event(ContainerRemoved, _) =>
+      healthActionProxy = None
+      startTestAction(self)
+
+      stay
+
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a 
failure message: ${msg}")
+
+      stay
+
+    case Event(ContainerCreationFailed(_), _) =>
+      stay
+  }
+
+  when(Healthy) {
+    case Event(msg: FailureMessage, _) =>
+      logging.error(this, s"invoker${instanceId}, status:${stateName} got a 
failure message: ${msg}")
+      goto(Unhealthy)
+  }
+
+  whenUnhandled {
+    case Event(_: Initialized, _) =>
+      // Initialized messages sent by ContainerProxy for HealthManger
+      stay()
+    case Event(ContainerRemoved, _) =>
+      // Drop messages sent by ContainerProxy for HealthManger
+      healthActionProxy = None
+      stay()
+
+    case Event(GracefulShutdown, _) =>
+      self ! GracefulShutdown
+      goto(Offline)
+
+    case Event(healthMsg: HealthMessage, data: InvokerInfo) =>
+      if (stateName != Offline) {
+        handleHealthMessage(healthMsg.state, data.buffer)
+      } else {
+        stay
+      }
+
+    case Event(memoryInfo: MemoryInfo, data: InvokerInfo) =>
+      publishHealthStatusAndStay(stateName, data.copy(memory = memoryInfo))
+
+    // in case of StatusRuntimeException: NOT_FOUND: etcdserver: requested 
lease not found, we need to get the lease again.
+    case Event(t: FailureMessage, _) =>
+      logging.error(this, s"Failure happens, restart InvokerHealthManager: 
${t}")
+
+      goto(Offline)
+
+  }
+
+  // It is important to note that stateName and the stateData in onTransition 
callback refer to the previous one.
+  // We should access to the next data with nextStateData
+  onTransition {
+    case Offline -> Unhealthy =>
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case Healthy -> Unhealthy =>
+      unstashAll()
+      transid.mark(
+        this,
+        LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(Unhealthy.asString),
+        s"invoker${instanceId.toInt} is unhealthy",
+        akka.event.Logging.WarningLevel)
+      startTestAction(self)
+      publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+    case _ -> Healthy =>
+      logging.info(this, s"invoker became healthy, stop health action proxy.")
+      unstashAll()
+      stopTestAction()
+
+      publishHealthStatusAndStay(Healthy, nextStateData)
+
+    case Offline -> Offline =>
+    // this is an initial transition due to startWith, do nothing
+
+    case _ -> newState =>
+      publishHealthStatusAndStay(newState, nextStateData)
+
+      unstashAll()
+
+  }
+
+  private def publishHealthStatusAndStay(state: InvokerState, stateData: 
InvokerHealthData) = {
+    stateData match {
+      case data: InvokerInfo =>
+        val invokerResourceMessage = InvokerResourceMessage(
+          state.asString,
+          data.memory.freeMemory,
+          data.memory.busyMemory,
+          data.memory.inProgressMemory,
+          instanceId.tags,
+          instanceId.dedicatedNamespaces)
+        InvokerHealthManager.useMemory = invokerResourceMessage.busyMemory + 
invokerResourceMessage.inProgressMemory
+        dataManagementService ! 
UpdateDataOnChange(InvokerKeys.health(instanceId), 
invokerResourceMessage.serialize)
+
+        stay using data.copy(currentInvokerResource = 
Some(invokerResourceMessage))
+
+      case data =>
+        logging.error(this, s"unexpected data is found: $data")
+
+        stay
+    }
+  }
+
+  initialize()
+
+  private def startTestAction(manager: ActorRef): Unit = {
+    val namespace = 
InvokerHealthManager.healthActionIdentity.namespace.name.asString
+    val docId = InvokerHealthManager.healthAction(instanceId).get.docid
+
+    WhiskAction.get(entityStore, docId).onComplete {
+      case Success(action) =>
+        val initialize = Initialize(namespace, 
action.toExecutableWhiskAction.get, "", 0, transid)
+        startHealthAction(initialize, manager)
+      case Failure(t) => logging.error(this, s"get health action error: 
${t.getMessage}")
+    }
+  }
+
+  private def startHealthAction(initialize: Initialize, manager: ActorRef): 
Unit = {
+    healthActionProxy match {
+      case Some(proxy) =>
+        // make healthContainerProxy's status is Running, then 
healthContainerProxy can fetch the activation using ActivationServiceClient
+        proxy ! initialize
+      case None =>
+        val proxy = healthContainerProxyFactory(context, manager)
+        proxy ! initialize
+        healthActionProxy = Some(proxy)
+    }
+  }
+
+  def stopTestAction(): Unit = {
+    healthActionProxy.foreach {
+      healthActionProxy = None
+      _ ! GracefulShutdown
+    }
+  }
+
+  /**
+   * This method is to handle health message from ContainerProxy.pub
+   * It can induce status change.
+   *
+   * @param state  activation result state
+   * @param buffer RingBuffer to track status
+   * @return
+   */
+  def handleHealthMessage(state: Boolean, buffer: RingBuffer[Boolean]): State 
= {

Review comment:
       In the existing invoker supervision, the buffer includes user actions 
result for the ring buffer to check for system error. I don't know without 
seeing more of the new scheduler code, but we'll still have those checks right 
elsewhere? Also since this ring buffer seems to only include health check 
activations, can this be more aggressive? I would think you can just go 
unhealthy if a single one fails rather than three of the last ten.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to