bdoyle0182 commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570470462
##########
File path:
core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,379 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props,
Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+ healthContainerProxyFactory: (ActorRefFactory,
ActorRef) => ActorRef,
+ dataManagementService: ActorRef,
+ entityStore: ArtifactStore[WhiskEntity])(implicit
actorSystem: ActorSystem, logging: Logging)
+ extends FSM[InvokerState, InvokerHealthData]
+ with Stash {
+
+ implicit val requestTimeout = Timeout(5.seconds)
+ implicit val ec: ExecutionContext = actorSystem.dispatcher
+ implicit val transid: TransactionId = TransactionId.invokerHealth
+
+ private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+ startWith(
+ Offline,
+ InvokerInfo(
+ new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+ memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+ when(Offline) {
+ case Event(GracefulShutdown, _: InvokerInfo) =>
+ logging.warn(this, "Received a graceful shutdown flag, stopping the
invoker.")
+ stay
+
+ case Event(Enable, _) =>
+ InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+ startTestAction(self)
+ }
+
+ goto(UnHealthy)
+ }
+
+ when(UnHealthy) {
Review comment:
```suggestion
when(Unhealthy) {
```
##########
File path:
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(format.read(msg.parseJson))
}
+case class InvokerResourceMessage(status: String,
+ freeMemory: Long,
+ busyMemory: Long,
+ inProgressMemory: Long,
+ tags: Seq[String],
Review comment:
What are tags used for?
##########
File path:
common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
##########
@@ -29,11 +29,18 @@ import scala.util.Try
* @param instance a numeric value used for the load balancing and Kafka topic
creation
* @param uniqueName an identifier required for dynamic instance assignment by
Zookeeper
* @param displayedName an identifier that is required for the health protocol
to correlate Kafka topics with invoker container names
+ * @param userMemory invoker user memory
+ * @param busyMemory invoker busy memory
+ * @param tags actions which included specified annotation tags can be run on
this invoker
+ * @param dedicatedNamespaces only dedicatedNamespaces's actions can be run on
this invoker
*/
case class InvokerInstanceId(val instance: Int,
uniqueName: Option[String] = None,
displayedName: Option[String] = None,
- val userMemory: ByteSize)
+ val userMemory: ByteSize,
Review comment:
Is this going to affect the message bus? As always I have to check how
this will affect rolling restarts of the controllers and invokers, will one
component be unhealthy while the other is upgraded.
##########
File path:
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(format.read(msg.parseJson))
}
+case class InvokerResourceMessage(status: String,
+ freeMemory: Long,
+ busyMemory: Long,
+ inProgressMemory: Long,
+ tags: Seq[String],
+ dedicatedNamespaces: Seq[String])
+ extends Message {
+
+ /**
+ * Serializes message to string. Must be idempotent.
+ */
+ override def serialize: String =
InvokerResourceMessage.serdes.write(this).compactPrint
+
+ def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
+
+ override def equals(that: Any): Boolean =
+ that match {
+ case that: InvokerResourceMessage => {
+ that.canEqual(this) &&
+ this.status == that.status &&
+ this.freeMemory == that.freeMemory &&
+ this.busyMemory == that.busyMemory &&
+ this.inProgressMemory == that.inProgressMemory &&
+ this.tags.toSet == that.tags.toSet
+ this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
+ }
+ case _ => false
+ }
+
+ override def hashCode: Int = {
+ var result = 1;
Review comment:
nit: you can map over each one to make this functional rather than use a
var. i.e.
`1.map(start => prime * start + status.hashCode()).map(next => prime * next
+ freeMemory.hashCode())...`
##########
File path:
core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
##########
@@ -0,0 +1,89 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.time.Instant
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.Container
+import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision,
ExecutableWhiskAction}
+import org.apache.openwhisk.core.entity.size._
+
+// Events received by the actor
+case class Initialize(invocationNamespace: String,
+ action: ExecutableWhiskAction,
+ schedulerHost: String,
+ rpcPort: Int,
+ transId: TransactionId)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+
+// Event sent by the actor
+case class ContainerCreationFailed(throwable: Throwable)
+case class ContainerIsPaused(data: WarmData)
+case class ClientCreationFailed(throwable: Throwable,
+ container: Container,
+ invocationNamespace: String,
+ action: ExecutableWhiskAction)
+case class ReadyToWork(data: Data)
+case class Initialized(data: InitializedData)
+case class Resumed(data: WarmData)
+case class ResumeFailed(data: WarmData)
+case class RecreateClient(action: ExecutableWhiskAction)
+case object ContainerRemoved // when container is destroyed
+
+// States
+sealed trait ProxyState
+case object LeaseStart extends ProxyState
+case object Uninitialized extends ProxyState
+case object ContainerCreating extends ProxyState
+case object ContainerCreated extends ProxyState
+case object ClientCreating extends ProxyState
+case object ClientCreated extends ProxyState
+case object Running extends ProxyState
+case object Pausing extends ProxyState
+case object Paused extends ProxyState
+case object Removing extends ProxyState
+case object Rescheduling extends ProxyState
+
+// Data
+sealed abstract class Data(val memoryLimit: ByteSize) {
+ def getContainer: Option[Container]
+}
+case class NoExistData() extends Data(0.B) {
Review comment:
```suggestion
case class NonexistentData() extends Data(0.B) {
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]