ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570695584
##########
File path:
core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
##########
@@ -0,0 +1,89 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.time.Instant
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.Container
+import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision,
ExecutableWhiskAction}
+import org.apache.openwhisk.core.entity.size._
+
+// Events received by the actor
+case class Initialize(invocationNamespace: String,
+ action: ExecutableWhiskAction,
+ schedulerHost: String,
+ rpcPort: Int,
+ transId: TransactionId)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+
+// Event sent by the actor
+case class ContainerCreationFailed(throwable: Throwable)
+case class ContainerIsPaused(data: WarmData)
+case class ClientCreationFailed(throwable: Throwable,
+ container: Container,
+ invocationNamespace: String,
+ action: ExecutableWhiskAction)
+case class ReadyToWork(data: Data)
+case class Initialized(data: InitializedData)
+case class Resumed(data: WarmData)
+case class ResumeFailed(data: WarmData)
+case class RecreateClient(action: ExecutableWhiskAction)
+case object ContainerRemoved // when container is destroyed
+
+// States
+sealed trait ProxyState
+case object LeaseStart extends ProxyState
+case object Uninitialized extends ProxyState
+case object ContainerCreating extends ProxyState
+case object ContainerCreated extends ProxyState
+case object ClientCreating extends ProxyState
+case object ClientCreated extends ProxyState
+case object Running extends ProxyState
+case object Pausing extends ProxyState
+case object Paused extends ProxyState
+case object Removing extends ProxyState
+case object Rescheduling extends ProxyState
+
+// Data
+sealed abstract class Data(val memoryLimit: ByteSize) {
+ def getContainer: Option[Container]
+}
+case class NoExistData() extends Data(0.B) {
Review comment:
Updated accordingly
##########
File path:
core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,379 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props,
Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+ healthContainerProxyFactory: (ActorRefFactory,
ActorRef) => ActorRef,
+ dataManagementService: ActorRef,
+ entityStore: ArtifactStore[WhiskEntity])(implicit
actorSystem: ActorSystem, logging: Logging)
+ extends FSM[InvokerState, InvokerHealthData]
+ with Stash {
+
+ implicit val requestTimeout = Timeout(5.seconds)
+ implicit val ec: ExecutionContext = actorSystem.dispatcher
+ implicit val transid: TransactionId = TransactionId.invokerHealth
+
+ private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+ startWith(
+ Offline,
+ InvokerInfo(
+ new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+ memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+ when(Offline) {
+ case Event(GracefulShutdown, _: InvokerInfo) =>
+ logging.warn(this, "Received a graceful shutdown flag, stopping the
invoker.")
+ stay
+
+ case Event(Enable, _) =>
+ InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+ startTestAction(self)
+ }
+
+ goto(UnHealthy)
+ }
+
+ when(UnHealthy) {
Review comment:
Updated accordingly
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]