This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 59b67fe Implement InvokerHealthyManager (#5061)
59b67fe is described below
commit 59b67fe96f44e573f3348afed966a1cdaf80ddf2
Author: ningyougang <[email protected]>
AuthorDate: Tue Mar 23 01:09:17 2021 +0800
Implement InvokerHealthyManager (#5061)
---
.../org/apache/openwhisk/common/Message.scala | 21 +
.../apache/openwhisk/common/TransactionId.scala | 2 +
.../apache/openwhisk/core/connector/Message.scala | 27 ++
.../apache/openwhisk/core/entity/InstanceId.scala | 26 +-
core/invoker/build.gradle | 1 +
.../containerpool/v2/ActivationClientProxy.scala | 33 ++
.../v2/FunctionPullingContainerProxy.scala | 106 +++++
.../containerpool/v2/InvokerHealthManager.scala | 381 +++++++++++++++++
tests/build.gradle | 1 +
.../v2/test/InvokerHealthManagerTests.scala | 453 +++++++++++++++++++++
.../core/entity/test/InvokerInstanceIdTests.scala | 14 +-
11 files changed, 1059 insertions(+), 6 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/Message.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/Message.scala
new file mode 100644
index 0000000..e425105
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Message.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.common
+
+case object GracefulShutdown
+case object Enable
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
index e8e58b6..f7df2d0 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala
@@ -236,6 +236,7 @@ object TransactionId {
val unknown = TransactionId(systemPrefix + "unknown")
val testing = TransactionId(systemPrefix + "testing") // Common id for for
unit testing
val invoker = TransactionId(systemPrefix + "invoker") // Invoker
startup/shutdown or GC activity
+ val invokerHealthManager = TransactionId(systemPrefix +
"invokerHealthManager") // Invoker startup/shutdown or GC activity
val invokerWarmup = TransactionId(systemPrefix + "invokerWarmup") // Invoker
warmup thread that makes stem-cell containers
val invokerNanny = TransactionId(systemPrefix + "invokerNanny") // Invoker
nanny thread
val dispatcher = TransactionId(systemPrefix + "dispatcher") // Kafka message
dispatcher
@@ -244,6 +245,7 @@ object TransactionId {
val controller = TransactionId(systemPrefix + "controller") // Controller
startup
val dbBatcher = TransactionId(systemPrefix + "dbBatcher") // Database batcher
val actionHealthPing = TransactionId(systemPrefix + "actionHealth")
+ def invokerHealthActivation = TransactionId(systemPrefix +
"invokerHealthActivation") // Invoker health activation
private val dict = ('A' to 'Z') ++ ('a' to 'z') ++ ('0' to '9')
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index 6052258..bcd56e0 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -428,6 +428,33 @@ object EventMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(format.read(msg.parseJson))
}
+case class InvokerResourceMessage(status: String,
+ freeMemory: Long,
+ busyMemory: Long,
+ inProgressMemory: Long,
+ tags: Seq[String],
+ dedicatedNamespaces: Seq[String])
+ extends Message {
+
+ /**
+ * Serializes message to string. Must be idempotent.
+ */
+ override def serialize: String =
InvokerResourceMessage.serdes.write(this).compactPrint
+}
+
+object InvokerResourceMessage extends DefaultJsonProtocol {
+ def parse(msg: String): Try[InvokerResourceMessage] =
Try(serdes.read(msg.parseJson))
+ implicit val serdes =
+ jsonFormat(
+ InvokerResourceMessage.apply _,
+ "status",
+ "freeMemory",
+ "busyMemory",
+ "inProgressMemory",
+ "tags",
+ "dedicatedNamespaces")
+}
+
/**
* This case class is used when retrieving the snapshot of the queue status
from the scheduler at a certain moment.
* This is useful to figure out the internal status when any issue happens.
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
index 0421e9b..34eadbd 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
@@ -29,11 +29,18 @@ import scala.util.Try
* @param instance a numeric value used for the load balancing and Kafka topic
creation
* @param uniqueName an identifier required for dynamic instance assignment by
Zookeeper
* @param displayedName an identifier that is required for the health protocol
to correlate Kafka topics with invoker container names
+ * @param userMemory invoker user memory
+ * @param busyMemory invoker busy memory
+ * @param tags actions which included specified annotation tags can be run on
this invoker
+ * @param dedicatedNamespaces only dedicatedNamespaces's actions can be run on
this invoker
*/
case class InvokerInstanceId(val instance: Int,
uniqueName: Option[String] = None,
displayedName: Option[String] = None,
- val userMemory: ByteSize)
+ val userMemory: ByteSize,
+ val busyMemory: Option[ByteSize] = None,
+ val tags: Seq[String] = Seq.empty[String],
+ val dedicatedNamespaces: Seq[String] = Seq.empty)
extends InstanceId {
def toInt: Int = instance
@@ -76,7 +83,12 @@ object InvokerInstanceId extends DefaultJsonProtocol {
val fields = new ListBuffer[(String, JsValue)]
fields ++= List("instance" -> JsNumber(i.instance))
fields ++= List("userMemory" -> JsString(i.userMemory.toString))
+ i.busyMemory.foreach { busyMemory =>
+ fields ++= List("busyMemory" -> JsString(busyMemory.toString))
+ }
fields ++= List("instanceType" -> JsString(i.instanceType))
+ fields ++= List("tags" -> JsArray(i.tags.map(_.toJson): _*))
+ fields ++= List("dedicatedNamespaces" ->
JsArray(i.dedicatedNamespaces.map(_.toJson): _*))
i.uniqueName.foreach(uniqueName => fields ++= List("uniqueName" ->
JsString(uniqueName)))
i.displayedName.foreach(displayedName => fields ++= List("displayedName"
-> JsString(displayedName)))
JsObject(fields.toSeq: _*)
@@ -87,10 +99,20 @@ object InvokerInstanceId extends DefaultJsonProtocol {
val uniqueName = fromField[Option[String]](json, "uniqueName")
val displayedName = fromField[Option[String]](json, "displayedName")
val userMemory = fromField[String](json, "userMemory")
+ val busyMemory = fromField[Option[String]](json, "busyMemory")
val instanceType = fromField[String](json, "instanceType")
+ val tags = fromField[Seq[String]](json, "tags")
+ val dedicatedNamespaces = fromField[Seq[String]](json,
"dedicatedNamespaces")
if (instanceType == "invoker") {
- new InvokerInstanceId(instance, uniqueName, displayedName,
ByteSize.fromString(userMemory))
+ new InvokerInstanceId(
+ instance,
+ uniqueName,
+ displayedName,
+ ByteSize.fromString(userMemory),
+ busyMemory.map(ByteSize.fromString(_)),
+ tags,
+ dedicatedNamespaces)
} else {
deserializationError("could not read InvokerInstanceId")
}
diff --git a/core/invoker/build.gradle b/core/invoker/build.gradle
index 1bf2984..06560f0 100644
--- a/core/invoker/build.gradle
+++ b/core/invoker/build.gradle
@@ -37,6 +37,7 @@ distDockerCoverage.dependsOn
':common:scala:scoverageClasses', 'scoverageClasses
dependencies {
compile "org.scala-lang:scala-library:${gradle.scala.version}"
compile project(':common:scala')
+ compile project(':core:scheduler')
compile ("org.apache.curator:curator-recipes:${gradle.curator.version}") {
exclude group: 'org.apache.zookeeper', module:'zookeeper'
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
new file mode 100644
index 0000000..9da672b
--- /dev/null
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/ActivationClientProxy.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
+
+// Event send by the actor
+case class ClientCreationCompleted(client: Option[ActorRef] = None)
+case object ClientClosed
+case object CloseClientProxy
+
+// Event received by the actor
+case object StartClient
+case class RequestActivation(lastDuration: Option[Long] = None, newScheduler:
Option[SchedulerEndpoints] = None)
+
+// TODO, use grpc to fetch activation from memoryQueue
+class ActivationClientProxy {}
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
new file mode 100644
index 0000000..2a33a0d
--- /dev/null
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.time.Instant
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.Container
+import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision,
ExecutableWhiskAction}
+import org.apache.openwhisk.core.entity.size._
+
+// Events received by the actor
+case class Initialize(invocationNamespace: String,
+ action: ExecutableWhiskAction,
+ schedulerHost: String,
+ rpcPort: Int,
+ transId: TransactionId)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+
+// Event sent by the actor
+case class ContainerCreationFailed(throwable: Throwable)
+case class ContainerIsPaused(data: WarmData)
+case class ClientCreationFailed(throwable: Throwable,
+ container: Container,
+ invocationNamespace: String,
+ action: ExecutableWhiskAction)
+case class ReadyToWork(data: Data)
+case class Initialized(data: InitializedData)
+case class Resumed(data: WarmData)
+case class ResumeFailed(data: WarmData)
+case class RecreateClient(action: ExecutableWhiskAction)
+case object ContainerRemoved // when container is destroyed
+
+// States
+sealed trait ProxyState
+case object LeaseStart extends ProxyState
+case object Uninitialized extends ProxyState
+case object CreatingContainer extends ProxyState
+case object ContainerCreated extends ProxyState
+case object CreatingClient extends ProxyState
+case object ClientCreated extends ProxyState
+case object Running extends ProxyState
+case object Pausing extends ProxyState
+case object Paused extends ProxyState
+case object Removing extends ProxyState
+case object Rescheduling extends ProxyState
+
+// Data
+sealed abstract class Data(val memoryLimit: ByteSize) {
+ def getContainer: Option[Container]
+}
+case class NonexistentData() extends Data(0.B) {
+ override def getContainer = None
+}
+case class MemoryData(override val memoryLimit: ByteSize) extends
Data(memoryLimit) {
+ override def getContainer = None
+}
+trait WithClient { val clientProxy: ActorRef }
+case class PreWarmData(container: Container, kind: String, override val
memoryLimit: ByteSize)
+ extends Data(memoryLimit) {
+ override def getContainer = Some(container)
+}
+
+case class ContainerCreatedData(container: Container, invocationNamespace:
String, action: ExecutableWhiskAction)
+ extends Data(action.limits.memory.megabytes.MB) {
+ override def getContainer = Some(container)
+}
+
+case class InitializedData(container: Container,
+ invocationNamespace: String,
+ action: ExecutableWhiskAction,
+ override val clientProxy: ActorRef)
+ extends Data(action.limits.memory.megabytes.MB)
+ with WithClient {
+ override def getContainer = Some(container)
+}
+
+case class WarmData(container: Container,
+ invocationNamespace: String,
+ action: ExecutableWhiskAction,
+ revision: DocRevision,
+ lastUsed: Instant,
+ override val clientProxy: ActorRef)
+ extends Data(action.limits.memory.megabytes.MB)
+ with WithClient {
+ override def getContainer = Some(container)
+}
+
+// TODO
+class FunctionPullingContainerProxy {}
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
new file mode 100644
index 0000000..8372fc1
--- /dev/null
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthManager.scala
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props,
Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.EtcdKV.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+ healthContainerProxyFactory: (ActorRefFactory,
ActorRef) => ActorRef,
+ dataManagementService: ActorRef,
+ entityStore: ArtifactStore[WhiskEntity])(implicit
actorSystem: ActorSystem, logging: Logging)
+ extends FSM[InvokerState, InvokerHealthData]
+ with Stash {
+
+ implicit val requestTimeout = Timeout(5.seconds)
+ implicit val ec: ExecutionContext = actorSystem.dispatcher
+ implicit val transid: TransactionId = TransactionId.invokerHealth
+
+ private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+ startWith(
+ Offline,
+ InvokerInfo(
+ new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+ memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+ when(Offline) {
+ case Event(GracefulShutdown, _: InvokerInfo) =>
+ logging.warn(this, "Received a graceful shutdown flag, stopping the
invoker.")
+ stay
+
+ case Event(Enable, _) =>
+ InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+ startTestAction(self)
+ }
+ goto(Unhealthy)
+ }
+
+ when(Unhealthy) {
+ case Event(ContainerRemoved, _) =>
+ healthActionProxy = None
+ startTestAction(self)
+ stay
+
+ case Event(msg: FailureMessage, _) =>
+ logging.error(this, s"invoker${instanceId}, status:${stateName} got a
failure message: ${msg}")
+ stay
+
+ case Event(ContainerCreationFailed(_), _) =>
+ stay
+ }
+
+ when(Healthy) {
+ case Event(msg: FailureMessage, _) =>
+ logging.error(this, s"invoker${instanceId}, status:${stateName} got a
failure message: ${msg}")
+ goto(Unhealthy)
+ }
+
+ whenUnhandled {
+ case Event(_: Initialized, _) =>
+ // Initialized messages sent by ContainerProxy for HealthManger
+ stay()
+
+ case Event(ContainerRemoved, _) =>
+ // Drop messages sent by ContainerProxy for HealthManger
+ healthActionProxy = None
+ stay()
+
+ case Event(GracefulShutdown, _) =>
+ self ! GracefulShutdown
+ goto(Offline)
+
+ case Event(healthMsg: HealthMessage, data: InvokerInfo) =>
+ if (stateName != Offline) {
+ handleHealthMessage(healthMsg.state, data.buffer)
+ } else {
+ stay
+ }
+
+ case Event(memoryInfo: MemoryInfo, data: InvokerInfo) =>
+ publishHealthStatusAndStay(stateName, data.copy(memory = memoryInfo))
+
+ // in case of StatusRuntimeException: NOT_FOUND: etcdserver: requested
lease not found, we need to get the lease again.
+ case Event(t: FailureMessage, _) =>
+ logging.error(this, s"Failure happens, restart InvokerHealthManager:
${t}")
+ goto(Offline)
+ }
+
+ // It is important to note that stateName and the stateData in onTransition
callback refer to the previous one.
+ // We should access to the next data with nextStateData
+ onTransition {
+ case Offline -> Unhealthy =>
+ publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+ case Healthy -> Unhealthy =>
+ unstashAll()
+ transid.mark(
+ this,
+ LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(Unhealthy.asString),
+ s"invoker${instanceId.toInt} is unhealthy",
+ akka.event.Logging.WarningLevel)
+ startTestAction(self)
+ publishHealthStatusAndStay(Unhealthy, nextStateData)
+
+ case _ -> Healthy =>
+ logging.info(this, s"invoker became healthy, stop health action proxy.")
+ unstashAll()
+ stopTestAction()
+
+ publishHealthStatusAndStay(Healthy, nextStateData)
+
+ case oldState -> newState if oldState != newState =>
+ publishHealthStatusAndStay(newState, nextStateData)
+ unstashAll()
+ }
+
+ private def publishHealthStatusAndStay(state: InvokerState, stateData:
InvokerHealthData) = {
+ stateData match {
+ case data: InvokerInfo =>
+ val invokerResourceMessage = InvokerResourceMessage(
+ state.asString,
+ data.memory.freeMemory,
+ data.memory.busyMemory,
+ data.memory.inProgressMemory,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces)
+ dataManagementService !
UpdateDataOnChange(InvokerKeys.health(instanceId),
invokerResourceMessage.serialize)
+ stay using data.copy(currentInvokerResource =
Some(invokerResourceMessage))
+
+ case data =>
+ logging.error(this, s"unexpected data is found: $data")
+ stay
+ }
+ }
+
+ initialize()
+
+ private def startTestAction(manager: ActorRef): Unit = {
+ val namespace =
InvokerHealthManager.healthActionIdentity.namespace.name.asString
+ val docId = InvokerHealthManager.healthAction(instanceId).get.docid
+
+ WhiskAction.get(entityStore, docId).onComplete {
+ case Success(action) =>
+ val initialize = Initialize(namespace,
action.toExecutableWhiskAction.get, "", 0, transid)
+ startHealthAction(initialize, manager)
+ case Failure(t) => logging.error(this, s"get health action error:
${t.getMessage}")
+ }
+ }
+
+ private def startHealthAction(initialize: Initialize, manager: ActorRef):
Unit = {
+ healthActionProxy match {
+ case Some(proxy) =>
+ // make healthContainerProxy's status is Running, then
healthContainerProxy can fetch the activation using ActivationServiceClient
+ proxy ! initialize
+ case None =>
+ val proxy = healthContainerProxyFactory(context, manager)
+ proxy ! initialize
+ healthActionProxy = Some(proxy)
+ }
+ }
+
+ def stopTestAction(): Unit = {
+ healthActionProxy.foreach {
+ healthActionProxy = None
+ _ ! GracefulShutdown
+ }
+ }
+
+ /**
+ * This method is to handle health message from ContainerProxy.pub
+ * It can induce status change.
+ *
+ * @param state activation result state
+ * @param buffer RingBuffer to track status
+ * @return
+ */
+ def handleHealthMessage(state: Boolean, buffer: RingBuffer[Boolean]): State
= {
+ buffer.add(state)
+ val falseStateCount = buffer.toList.count(_ == false)
+ if (falseStateCount < InvokerHealthManager.bufferErrorTolerance) {
+ gotoIfNotThere(Healthy)
+ } else {
+ logging.warn(
+ this,
+ s"become unhealthy because system error exceeded the error tolerance,
falseStateCount $falseStateCount, errorTolerance
${InvokerHealthManager.bufferErrorTolerance}")
+ gotoIfNotThere(Unhealthy)
+ }
+ }
+
+ /**
+ * This is to decide wether to change from the newState or not.
+ * If current state is already newState, it will stay, otherwise it will
change its state.
+ *
+ * @param newState the desired state to change.
+ * @return
+ */
+ private def gotoIfNotThere(newState: InvokerState) = {
+ if (stateName == newState) {
+ stay()
+ } else {
+ goto(newState)
+ }
+ }
+
+ /** Delays all incoming messages until unstashAll() is called */
+ def delay = {
+ stash()
+ stay
+ }
+
+}
+
+case class HealthActivationServiceClient() extends Actor {
+
+ private var closed: Boolean = false
+
+ override def receive: Receive = {
+ case StartClient => sender() ! ClientCreationCompleted()
+ case _: RequestActivation =>
+ InvokerHealthManager.healthActivation match {
+ case Some(activation) if !closed =>
+ sender() ! activation.copy(
+ transid = TransactionId.invokerHealthActivation,
+ activationId = ActivationId.generate())
+
+ case _ if closed =>
+ context.parent ! ClientClosed
+ context.stop(self)
+
+ case _ => // do nothing
+ }
+
+ case CloseClientProxy =>
+ closed = true
+
+ }
+}
+
+object InvokerHealthManager {
+ val healthActionNamePrefix = "invokerHealthTestAction"
+ val bufferSize = 10
+ val bufferErrorTolerance = 3
+ val healthActionIdentity: Identity = {
+ val whiskSystem = "whisk.system"
+ val uuid = UUID()
+ Identity(
+ Subject(whiskSystem),
+ Namespace(EntityName(whiskSystem), uuid),
+ BasicAuthenticationAuthKey(uuid, Secret()),
+ Set[Privilege]())
+ }
+
+ def healthAction(i: InvokerInstanceId): Option[WhiskAction] =
+ ExecManifest.runtimesManifest.resolveDefaultRuntime("nodejs:default").map
{ manifest =>
+ new WhiskAction(
+ namespace =
InvokerHealthManager.healthActionIdentity.namespace.name.toPath,
+ name = EntityName(s"$healthActionNamePrefix${i.toInt}"),
+ exec = CodeExecAsString(manifest, """function main(params) { return
params; }""", None),
+ limits = ActionLimits(memory = MemoryLimit(MemoryLimit.MIN_MEMORY),
logs = LogLimit(0.B)))
+ }
+
+ var healthActivation: Option[ActivationMessage] = None
+
+ private def createTestActionForInvokerHealth(db: EntityStore, action:
WhiskAction): Future[DocInfo] = {
+ implicit val tid: TransactionId = TransactionId.invokerHealthManager
+ implicit val ec: ExecutionContext = db.executionContext
+ implicit val logging: Logging = db.logging
+
+ WhiskAction
+ .get(db, action.docid)
+ .flatMap { oldAction =>
+ WhiskAction.put(db, action.revision(oldAction.rev),
Some(oldAction))(tid, notifier = None)
+ }
+ .recoverWith {
+ case _: NoDocumentException => WhiskAction.put(db, action, old =
None)(tid, notifier = None)
+ }
+ .andThen {
+ case Success(_) => logging.info(this, "test action for invoker health
now exists")
+ case Failure(e) => logging.error(this, s"error creating test action
for invoker health: $e")
+ }
+ }
+
+ private def createHealthActivation(entityStore: ArtifactStore[WhiskEntity],
+ docInfo: DocInfo)(implicit ec:
ExecutionContext, logging: Logging) = {
+ implicit val transId = TransactionId.invokerHealth
+
+ WhiskAction.get(entityStore, docInfo.id).onComplete {
+ case Success(action) =>
+ healthActivation = Some(
+ ActivationMessage(
+ TransactionId.invokerHealth,
+ action.toExecutableWhiskAction.get.fullyQualifiedName(true),
+ action.rev,
+ healthActionIdentity,
+ ActivationId.generate(),
+ ControllerInstanceId("health"),
+ blocking = false,
+ content = None))
+ case Failure(t) => logging.error(this, s"get health action error:
${t.getMessage}")
+ }
+ }
+
+ def prepare(entityStore: ArtifactStore[WhiskEntity],
+ invokerInstanceId: InvokerInstanceId)(implicit ec:
ExecutionContext, logging: Logging): Future[Unit] = {
+ InvokerHealthManager.healthAction(invokerInstanceId) match {
+ case Some(action) =>
+ createTestActionForInvokerHealth(entityStore, action)
+ .map(docId => createHealthActivation(entityStore, docId))
+ case None =>
+ throw new IllegalStateException(
+ "cannot create test action for invoker health because runtime
manifest is not valid")
+ }
+ }
+
+ def props(instanceId: InvokerInstanceId,
+ childFactory: (ActorRefFactory, ActorRef) => ActorRef,
+ dataManagementService: ActorRef,
+ entityStore: ArtifactStore[WhiskEntity])(implicit actorSystem:
ActorSystem, logging: Logging): Props = {
+ Props(new InvokerHealthManager(instanceId, childFactory,
dataManagementService, entityStore))
+ }
+}
+
+// States an Invoker can be in
+sealed trait InvokerState {
+ val asString: String
+}
+
+case object Offline extends InvokerState {
+ val asString = "down"
+}
+
+case object Healthy extends InvokerState {
+ val asString = "up"
+}
+
+case object Unhealthy extends InvokerState {
+ val asString = "unhealthy"
+}
+
+//recevied from ContainerProxy actor
+case class HealthMessage(state: Boolean)
+
+//rereived from ContainerPool actor
+case class MemoryInfo(freeMemory: Long, busyMemory: Long, inProgressMemory:
Long)
+
+// Data stored in the Invoker
+sealed class InvokerHealthData
+
+case class InvokerInfo(buffer: RingBuffer[Boolean],
+ memory: MemoryInfo = MemoryInfo(0, 0, 0),
+ currentInvokerResource: Option[InvokerResourceMessage]
= None)
+ extends InvokerHealthData
diff --git a/tests/build.gradle b/tests/build.gradle
index 2d82234..60c7b3f 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -89,6 +89,7 @@ ext.testSets = [
"REQUIRE_SCHEDULER" : [
"includes" : [
"org/apache/openwhisk/common/etcd/**",
+ "org/apache/openwhisk/core/containerpool/v2/test/**",
"org/apache/openwhisk/core/scheduler/**",
"org/apache/openwhisk/core/service/**",
]
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala
new file mode 100644
index 0000000..8ab5946
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/InvokerHealthManagerTests.scala
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.v2.test
+
+import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem}
+import akka.stream.ActorMaterializer
+import akka.testkit.{ImplicitSender, TestActor, TestFSMRef, TestKit, TestProbe}
+import common.StreamLogging
+import org.apache.openwhisk.common.{Enable, GracefulShutdown, RingBuffer}
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.connector.InvokerResourceMessage
+import org.apache.openwhisk.core.containerpool.v2._
+import org.apache.openwhisk.core.database.test.DbUtils
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.{ExecManifest, InvokerInstanceId,
WhiskEntityStore}
+import org.apache.openwhisk.core.etcd.EtcdKV.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike,
Matchers}
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+@RunWith(classOf[JUnitRunner])
+class InvokerHealthManagerTests
+ extends TestKit(ActorSystem("InvokerHealthManager"))
+ with ImplicitSender
+ with FlatSpecLike
+ with Matchers
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with MockFactory
+ with ScalaFutures
+ with StreamLogging
+ with DbUtils {
+
+ override def afterAll = TestKit.shutdownActorSystem(system)
+
+ implicit val mt = ActorMaterializer()
+ implicit val ec = system.dispatcher
+
+ val config = new WhiskConfig(ExecManifest.requiredProperties)
+
+ ExecManifest.initialize(config) should be a 'success
+
+ val timeout = 10.seconds
+
+ val instanceId = InvokerInstanceId(0, userMemory = 1024.MB, tags =
Seq("gpu-enabled", "high-memory"))
+
+ val entityStore = WhiskEntityStore.datastore()
+
+ val freeMemory: Long = 512
+
+ val busyMemory: Long = 256
+
+ val inProgressMemory: Long = 256
+
+ /** Creates a sequence of containers and a factory returning this sequence.
*/
+ def testContainers(n: Int) = {
+ val containers = (0 to n).map(_ => TestProbe())
+ val queue = mutable.Queue(containers: _*)
+ val factory = (fac: ActorRefFactory, manager: ActorRef) =>
queue.dequeue().ref
+ (containers, factory)
+ }
+
+ behavior of "InvokerHealthManager"
+
+ it should "invoke health action and become healthy state" in within(timeout)
{
+ val (_, factory) = testContainers(InvokerHealthManager.bufferSize)
+ val dataManagementService = TestProbe()
+ val probe = TestProbe()
+ val fsm = TestFSMRef(new InvokerHealthManager(instanceId, factory,
dataManagementService.ref, entityStore))
+ fsm ! SubscribeTransitionCallBack(probe.ref)
+ probe.expectMsg(CurrentState(fsm, Offline))
+
+ fsm ! Enable
+ (1 to InvokerHealthManager.bufferSize -
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+ fsm ! HealthMessage(true)
+ }
+
+ probe.expectMsg(Transition(fsm, Offline, Unhealthy))
+ probe.expectMsg(Transition(fsm, Unhealthy, Healthy))
+
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Unhealthy.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Healthy.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+ }
+
+ it should "invoke health action again when it becomes unhealthy" in
within(timeout) {
+ val (_, factory) = testContainers(InvokerHealthManager.bufferSize)
+ val dataManagementService = TestProbe()
+ val fsm = TestFSMRef(new InvokerHealthManager(instanceId, factory,
dataManagementService.ref, entityStore))
+ val probe = TestProbe()
+ var buffer = new RingBuffer[Boolean](InvokerHealthManager.bufferSize)
+ (1 to InvokerHealthManager.bufferSize -
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+ buffer.add(true)
+ }
+ fsm.setState(Healthy, InvokerInfo(buffer, memory =
MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+ fsm ! SubscribeTransitionCallBack(probe.ref)
+ probe.expectMsg(CurrentState(fsm, Healthy))
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Healthy.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+
+ (1 to InvokerHealthManager.bufferErrorTolerance + 1) foreach { _ =>
+ fsm ! HealthMessage(false)
+ }
+
+ probe.expectMsg(Transition(fsm, Healthy, Unhealthy))
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Unhealthy.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+ }
+
+ it should "publish healthy status and pool info to etcd together" in
within(timeout) {
+ val (_, factory) = testContainers(InvokerHealthManager.bufferSize)
+ val dataManagementService = TestProbe()
+ val fsm = TestFSMRef(new InvokerHealthManager(instanceId, factory,
dataManagementService.ref, entityStore))
+ val probe = TestProbe()
+ fsm ! SubscribeTransitionCallBack(probe.ref)
+ probe.expectMsg(CurrentState(fsm, Offline))
+
+ fsm ! Enable
+ (1 to InvokerHealthManager.bufferSize -
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+ fsm ! HealthMessage(true)
+ }
+
+ probe.expectMsg(Transition(fsm, Offline, Unhealthy))
+ probe.expectMsg(Transition(fsm, Unhealthy, Healthy))
+
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Unhealthy.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Healthy.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+
+ fsm ! MemoryInfo(freeMemory, busyMemory, inProgressMemory)
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Healthy.asString,
+ freeMemory,
+ busyMemory,
+ inProgressMemory,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+ }
+
+ it should "change the invoker pool info to etcd when memory info has
changes" in within(timeout) {
+ val (_, factory) = testContainers(InvokerHealthManager.bufferSize)
+ val dataManagementService = TestProbe()
+ val fsm = TestFSMRef(new InvokerHealthManager(instanceId, factory,
dataManagementService.ref, entityStore))
+ val probe = TestProbe()
+ fsm ! SubscribeTransitionCallBack(probe.ref)
+ probe.expectMsg(CurrentState(fsm, Offline))
+
+ fsm ! Enable
+ (1 to InvokerHealthManager.bufferSize -
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+ fsm ! HealthMessage(true)
+ }
+
+ probe.expectMsg(Transition(fsm, Offline, Unhealthy))
+ probe.expectMsg(Transition(fsm, Unhealthy, Healthy))
+
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Unhealthy.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Healthy.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+
+ fsm ! MemoryInfo(freeMemory, busyMemory, inProgressMemory)
+
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Healthy.asString,
+ freeMemory,
+ busyMemory,
+ inProgressMemory,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+
+ val changedFreeMemory = freeMemory - 256
+ val changedBusyMemory = busyMemory + 256
+ fsm ! MemoryInfo(changedFreeMemory, changedBusyMemory, inProgressMemory)
+
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Healthy.asString,
+ changedFreeMemory,
+ changedBusyMemory,
+ inProgressMemory,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+ }
+
+ it should "disable and enable the invoker gracefully" in within(timeout) {
+ val (_, factory) = testContainers(InvokerHealthManager.bufferSize)
+ val dataManagementService = TestProbe()
+ val fsm = TestFSMRef(new InvokerHealthManager(instanceId, factory,
dataManagementService.ref, entityStore))
+
+ val probe = TestProbe()
+ fsm ! SubscribeTransitionCallBack(probe.ref)
+ probe.expectMsg(CurrentState(fsm, Offline))
+
+ fsm ! Enable
+ (1 to InvokerHealthManager.bufferSize -
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+ fsm ! HealthMessage(true)
+ }
+
+ probe.expectMsg(Transition(fsm, Offline, Unhealthy))
+ probe.expectMsg(Transition(fsm, Unhealthy, Healthy))
+
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Unhealthy.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Healthy.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+
+ fsm ! GracefulShutdown
+
+ probe.expectMsg(Transition(fsm, Healthy, Offline))
+
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Offline.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+
+ val mockHealthActionProxy = TestProbe()
+ fsm.underlyingActor.healthActionProxy = Some(mockHealthActionProxy.ref)
+
+ fsm ! Enable
+
+ probe.expectMsg(Transition(fsm, Offline, Unhealthy))
+
+ mockHealthActionProxy.setAutoPilot((sender, msg) =>
+ msg match {
+ case _: Initialize =>
+ (1 to InvokerHealthManager.bufferSize -
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+ sender ! HealthMessage(true)
+ }
+
+ TestActor.KeepRunning
+
+ case GracefulShutdown =>
+ TestActor.KeepRunning
+
+ })
+ probe.expectMsg(5.seconds, Transition(fsm, Unhealthy, Healthy))
+
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Unhealthy.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Healthy.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+ }
+
+ it should "keep status Offline all the time in spite of receive
healthMessage" in within(timeout) {
+ val (_, factory) = testContainers(InvokerHealthManager.bufferSize)
+ val dataManagementService = TestProbe()
+ val fsm = TestFSMRef(new InvokerHealthManager(instanceId, factory,
dataManagementService.ref, entityStore))
+
+ val probe = TestProbe()
+ fsm ! SubscribeTransitionCallBack(probe.ref)
+ probe.expectMsg(CurrentState(fsm, Offline))
+
+ fsm ! Enable
+ (1 to InvokerHealthManager.bufferSize -
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+ fsm ! HealthMessage(true)
+ }
+
+ probe.expectMsg(Transition(fsm, Offline, Unhealthy))
+ probe.expectMsg(Transition(fsm, Unhealthy, Healthy))
+
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Unhealthy.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Healthy.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+
+ fsm ! GracefulShutdown
+
+ probe.expectMsg(Transition(fsm, Healthy, Offline))
+
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Offline.asString,
+ instanceId.userMemory.toMB,
+ 0,
+ 0,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+
+ (1 to InvokerHealthManager.bufferSize -
InvokerHealthManager.bufferErrorTolerance) foreach { _ =>
+ fsm ! HealthMessage(true)
+ }
+
+ // Keep the status Offline all the time unless enable it first
+ probe.expectNoMessage()
+
+ val changedFreeMemory = freeMemory - 256
+ val changedBusyMemory = busyMemory + 256
+ fsm ! MemoryInfo(changedFreeMemory, changedBusyMemory, inProgressMemory)
+
+ // In spite of the status is Offline, the Memory info may be changed
during zerodowntime deployment for invoker.
+ dataManagementService.expectMsg(
+ UpdateDataOnChange(
+ InvokerKeys.health(instanceId),
+ InvokerResourceMessage(
+ Offline.asString,
+ changedFreeMemory,
+ changedBusyMemory,
+ inProgressMemory,
+ instanceId.tags,
+ instanceId.dedicatedNamespaces).serialize))
+ }
+}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/InvokerInstanceIdTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/InvokerInstanceIdTests.scala
index b0a1016..fb3a35c 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/InvokerInstanceIdTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/InvokerInstanceIdTests.scala
@@ -22,7 +22,7 @@ import org.apache.openwhisk.core.entity.{ByteSize,
InstanceId, InvokerInstanceId
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Matchers}
-import spray.json.{JsNumber, JsObject, JsString}
+import spray.json.{JsArray, JsNumber, JsObject, JsString}
import scala.util.Success
@@ -37,7 +37,9 @@ class InvokerInstanceIdTests extends FlatSpec with Matchers {
i.serialize shouldBe JsObject(
"instance" -> JsNumber(i.instance),
"userMemory" -> JsString(i.userMemory.toString),
- "instanceType" -> JsString(i.instanceType)).compactPrint
+ "instanceType" -> JsString(i.instanceType),
+ "tags" -> JsArray.empty,
+ "dedicatedNamespaces" -> JsArray.empty).compactPrint
i.serialize shouldBe i.toJson.compactPrint
InstanceId.parse(i.serialize) shouldBe Success(i)
}
@@ -48,7 +50,9 @@ class InvokerInstanceIdTests extends FlatSpec with Matchers {
"instance" -> JsNumber(i1.instance),
"userMemory" -> JsString(i1.userMemory.toString),
"instanceType" -> JsString(i1.instanceType),
- "uniqueName" -> JsString(i1.uniqueName.getOrElse(""))).compactPrint
+ "uniqueName" -> JsString(i1.uniqueName.getOrElse("")),
+ "tags" -> JsArray.empty,
+ "dedicatedNamespaces" -> JsArray.empty).compactPrint
i1.serialize shouldBe i1.toJson.compactPrint
InstanceId.parse(i1.serialize) shouldBe Success(i1)
@@ -62,7 +66,9 @@ class InvokerInstanceIdTests extends FlatSpec with Matchers {
"userMemory" -> JsString(i2.userMemory.toString),
"instanceType" -> JsString(i2.instanceType),
"uniqueName" -> JsString(i2.uniqueName.getOrElse("")),
- "displayedName" -> JsString(i2.displayedName.getOrElse(""))).compactPrint
+ "displayedName" -> JsString(i2.displayedName.getOrElse("")),
+ "tags" -> JsArray.empty,
+ "dedicatedNamespaces" -> JsArray.empty).compactPrint
i2.serialize shouldBe i2.toJson.compactPrint
InstanceId.parse(i2.serialize) shouldBe Success(i2)
}