ningyougang commented on a change in pull request #5061:
URL: https://github.com/apache/openwhisk/pull/5061#discussion_r570691563



##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],
+                                  dedicatedNamespaces: Seq[String])
+    extends Message {
+
+  /**
+   * Serializes message to string. Must be idempotent.
+   */
+  override def serialize: String = 
InvokerResourceMessage.serdes.write(this).compactPrint
+
+  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
+
+  override def equals(that: Any): Boolean =

Review comment:
       Ah, yes, i tested in my local, after removed `the override def equals 
and the override def hashCode`, works well both, e.g.
   for two InvokerResourceMessage object,
   * If all field value is same, object1.equals(object) will be `true`, 
   * if any one filed value is different,  object1.equals(object) will be 
`false` 

##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],
+                                  dedicatedNamespaces: Seq[String])
+    extends Message {
+
+  /**
+   * Serializes message to string. Must be idempotent.
+   */
+  override def serialize: String = 
InvokerResourceMessage.serdes.write(this).compactPrint
+
+  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
+
+  override def equals(that: Any): Boolean =

Review comment:
       Ah, yes, i tested in my local, after removed `the override def equals 
and the override def hashCode`, works well both, e.g.
   for two InvokerResourceMessage object,
   * If all field value is same, object1.equals(object) will be `true`, 
   * if any one filed value is different,  object1.equals(object) will be 
`false` 
   
   I already removed `the override def equals and the override def hashCode` in 
this pr

##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/entity/InstanceId.scala
##########
@@ -29,11 +29,18 @@ import scala.util.Try
  * @param instance a numeric value used for the load balancing and Kafka topic 
creation
  * @param uniqueName an identifier required for dynamic instance assignment by 
Zookeeper
  * @param displayedName an identifier that is required for the health protocol 
to correlate Kafka topics with invoker container names
+ * @param userMemory invoker user memory
+ * @param busyMemory invoker busy memory
+ * @param tags actions which included specified annotation tags can be run on 
this invoker
+ * @param dedicatedNamespaces only dedicatedNamespaces's actions can be run on 
this invoker
  */
 case class InvokerInstanceId(val instance: Int,
                              uniqueName: Option[String] = None,
                              displayedName: Option[String] = None,
-                             val userMemory: ByteSize)
+                             val userMemory: ByteSize,

Review comment:
       Good question.
   
   I tested in my local, doesn't affect the message bus.  This pr's invoker in 
upsteam master's controller's healthy status is `up`.
   But your said problem i meet before, in that time, it seems the PingMessage 
is changed, so lead to the invoker of new codes in controller of old code's 
healthy status is `unhealthy`. if this issue comes. we solved it using another 
deployment method, e.g.
   * Remove half controller from nginx.
   * Disable half invoker
   * Deploy half controller/invoker using new codes
   * Add the half controller nginx.
   * Deploy another half components using above steps

##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],
+                                  dedicatedNamespaces: Seq[String])
+    extends Message {
+
+  /**
+   * Serializes message to string. Must be idempotent.
+   */
+  override def serialize: String = 
InvokerResourceMessage.serdes.write(this).compactPrint
+
+  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
+
+  override def equals(that: Any): Boolean =
+    that match {
+      case that: InvokerResourceMessage => {
+        that.canEqual(this) &&
+        this.status == that.status &&
+        this.freeMemory == that.freeMemory &&
+        this.busyMemory == that.busyMemory &&
+        this.inProgressMemory == that.inProgressMemory &&
+        this.tags.toSet == that.tags.toSet
+        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
+      }
+      case _ => false
+    }
+
+  override def hashCode: Int = {
+    var result = 1;

Review comment:
       I already removed the equals and hashCode.

##########
File path: 
core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
##########
@@ -0,0 +1,89 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import java.time.Instant
+
+import akka.actor.ActorRef
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.containerpool.Container
+import org.apache.openwhisk.core.entity.{ByteSize, CodeExec, DocRevision, 
ExecutableWhiskAction}
+import org.apache.openwhisk.core.entity.size._
+
+// Events received by the actor
+case class Initialize(invocationNamespace: String,
+                      action: ExecutableWhiskAction,
+                      schedulerHost: String,
+                      rpcPort: Int,
+                      transId: TransactionId)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+
+// Event sent by the actor
+case class ContainerCreationFailed(throwable: Throwable)
+case class ContainerIsPaused(data: WarmData)
+case class ClientCreationFailed(throwable: Throwable,
+                                container: Container,
+                                invocationNamespace: String,
+                                action: ExecutableWhiskAction)
+case class ReadyToWork(data: Data)
+case class Initialized(data: InitializedData)
+case class Resumed(data: WarmData)
+case class ResumeFailed(data: WarmData)
+case class RecreateClient(action: ExecutableWhiskAction)
+case object ContainerRemoved // when container is destroyed
+
+// States
+sealed trait ProxyState
+case object LeaseStart extends ProxyState
+case object Uninitialized extends ProxyState
+case object ContainerCreating extends ProxyState
+case object ContainerCreated extends ProxyState
+case object ClientCreating extends ProxyState
+case object ClientCreated extends ProxyState
+case object Running extends ProxyState
+case object Pausing extends ProxyState
+case object Paused extends ProxyState
+case object Removing extends ProxyState
+case object Rescheduling extends ProxyState
+
+// Data
+sealed abstract class Data(val memoryLimit: ByteSize) {
+  def getContainer: Option[Container]
+}
+case class NoExistData() extends Data(0.B) {

Review comment:
       Updated accordingly

##########
File path: 
core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/InvokerHealthyManager.scala
##########
@@ -0,0 +1,379 @@
+package org.apache.openwhisk.core.containerpool.v2
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, 
Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{ArtifactStore, NoDocumentException}
+import org.apache.openwhisk.core.entitlement.Privilege
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.types.EntityStore
+import org.apache.openwhisk.core.entity.{ActivationResponse => _, _}
+import org.apache.openwhisk.core.etcd.InvokerKeys
+import org.apache.openwhisk.core.service.UpdateDataOnChange
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class InvokerHealthManager(instanceId: InvokerInstanceId,
+                           healthContainerProxyFactory: (ActorRefFactory, 
ActorRef) => ActorRef,
+                           dataManagementService: ActorRef,
+                           entityStore: ArtifactStore[WhiskEntity])(implicit 
actorSystem: ActorSystem, logging: Logging)
+    extends FSM[InvokerState, InvokerHealthData]
+    with Stash {
+
+  implicit val requestTimeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val transid: TransactionId = TransactionId.invokerHealth
+
+  private[containerpool] var healthActionProxy: Option[ActorRef] = None
+
+  startWith(
+    Offline,
+    InvokerInfo(
+      new RingBuffer[Boolean](InvokerHealthManager.bufferSize),
+      memory = MemoryInfo(instanceId.userMemory.toMB, 0, 0)))
+
+  when(Offline) {
+    case Event(GracefulShutdown, _: InvokerInfo) =>
+      logging.warn(this, "Received a graceful shutdown flag, stopping the 
invoker.")
+      stay
+
+    case Event(Enable, _) =>
+      InvokerHealthManager.prepare(entityStore, instanceId).map { _ =>
+        startTestAction(self)
+      }
+
+      goto(UnHealthy)
+  }
+
+  when(UnHealthy) {

Review comment:
       Updated accordingly

##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -440,34 +440,6 @@ case class InvokerResourceMessage(status: String,
    * Serializes message to string. Must be idempotent.
    */
   override def serialize: String = 
InvokerResourceMessage.serdes.write(this).compactPrint
-
-  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
-
-  override def equals(that: Any): Boolean =
-    that match {
-      case that: InvokerResourceMessage => {
-        that.canEqual(this) &&
-        this.status == that.status &&
-        this.freeMemory == that.freeMemory &&
-        this.busyMemory == that.busyMemory &&
-        this.inProgressMemory == that.inProgressMemory &&
-        this.tags.toSet == that.tags.toSet
-        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
-      }
-      case _ => false
-    }
-
-  override def hashCode: Int = {

Review comment:
       Ok, i will check

##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -440,34 +440,6 @@ case class InvokerResourceMessage(status: String,
    * Serializes message to string. Must be idempotent.
    */
   override def serialize: String = 
InvokerResourceMessage.serdes.write(this).compactPrint
-
-  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
-
-  override def equals(that: Any): Boolean =
-    that match {
-      case that: InvokerResourceMessage => {
-        that.canEqual(this) &&
-        this.status == that.status &&
-        this.freeMemory == that.freeMemory &&
-        this.busyMemory == that.busyMemory &&
-        this.inProgressMemory == that.inProgressMemory &&
-        this.tags.toSet == that.tags.toSet
-        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
-      }
-      case _ => false
-    }
-
-  override def hashCode: Int = {

Review comment:
       After removed InvokerResourceMessage's hashCode and equals, i tested 
below scenes
   
   for two InvokerResourceMessage object,
   * If all field value is same, object1.equals(object) will be `true`, 
   * if any one filed value is different,  object1.equals(object) will be 
`false` 

##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -440,34 +440,6 @@ case class InvokerResourceMessage(status: String,
    * Serializes message to string. Must be idempotent.
    */
   override def serialize: String = 
InvokerResourceMessage.serdes.write(this).compactPrint
-
-  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
-
-  override def equals(that: Any): Boolean =
-    that match {
-      case that: InvokerResourceMessage => {
-        that.canEqual(this) &&
-        this.status == that.status &&
-        this.freeMemory == that.freeMemory &&
-        this.busyMemory == that.busyMemory &&
-        this.inProgressMemory == that.inProgressMemory &&
-        this.tags.toSet == that.tags.toSet
-        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
-      }
-      case _ => false
-    }
-
-  override def hashCode: Int = {

Review comment:
       After removed InvokerResourceMessage's hashCode and equals, i tested 
below scenes
   
   for two InvokerResourceMessage object,
   * If all field value is same, object1.equals(object) will be `true`, 
   * if any one filed value is different,  object1.equals(object) will be 
`false`
   
   For why added the override hashCode and equals.
   Seems have no specifal reason 

##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -440,34 +440,6 @@ case class InvokerResourceMessage(status: String,
    * Serializes message to string. Must be idempotent.
    */
   override def serialize: String = 
InvokerResourceMessage.serdes.write(this).compactPrint
-
-  def canEqual(a: Any) = a.isInstanceOf[InvokerResourceMessage]
-
-  override def equals(that: Any): Boolean =
-    that match {
-      case that: InvokerResourceMessage => {
-        that.canEqual(this) &&
-        this.status == that.status &&
-        this.freeMemory == that.freeMemory &&
-        this.busyMemory == that.busyMemory &&
-        this.inProgressMemory == that.inProgressMemory &&
-        this.tags.toSet == that.tags.toSet
-        this.dedicatedNamespaces.toSet == that.dedicatedNamespaces.toSet
-      }
-      case _ => false
-    }
-
-  override def hashCode: Int = {

Review comment:
       After removed InvokerResourceMessage's hashCode and equals, i tested 
below scenes
   
   for two InvokerResourceMessage object,
   * If all field value is same, object1.equals(object) will be `true`, 
   * if any one filed value is different,  object1.equals(object) will be 
`false`
   
   Regarding `why added the override hashCode and equals in our code`
   Seems have no specifal reason 

##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       This is used for some special actions. e.g. need high memory, need power 
gpu.
   We can make the action's invocations run on corresponding invokers
   
   * Firstly, add some annotation to the actions, e.g.
   
   ```
   wsk action create hello-gpu ~/hello-gpu.js --annotation  invoker-resources 
["gpu"]
   ```
   * Secondly, when deploy invoker, add the relative tag to the invoker as well
   ```
   whisk/invokers/0/0
   
{"busyMemory":0,"dedicatedNamespaces":[],"freeMemory":10240,"inProgressMemory":0,"status":"up","tags":['gpu']}
   whisk/invokers/1/1
   
{"busyMemory":0,"dedicatedNamespaces":[],"freeMemory":10240,"inProgressMemory":0,"status":"up","tags":[]}
   ```
   * Finally, when run the action, the activations for that action will run on 
above invoker0 which includes tag: gpu

##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       BTW, dedicatedNamespaces means dedicatedNamespaces's all actions ran on 
corresponding invoker.

##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       This is used for some special actions. e.g. need high memory, need power 
gpu.
   We can make the action's invocations run on corresponding invokers (BTW, 
this requirement comes from user in our company)
   
   * Firstly, add some annotation to the actions, e.g.
   
   ```
   wsk action create hello-gpu ~/hello-gpu.js --annotation  invoker-resources 
["gpu"]
   ```
   * Secondly, when deploy invoker, add the relative tag to the invoker as well
   ```
   whisk/invokers/0/0
   
{"busyMemory":0,"dedicatedNamespaces":[],"freeMemory":10240,"inProgressMemory":0,"status":"up","tags":['gpu']}
   whisk/invokers/1/1
   
{"busyMemory":0,"dedicatedNamespaces":[],"freeMemory":10240,"inProgressMemory":0,"status":"up","tags":[]}
   ```
   * Finally, when run the action, the activations for that action will run on 
above invoker0 which includes tag: gpu

##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
##########
@@ -428,6 +428,61 @@ object EventMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(format.read(msg.parseJson))
 }
 
+case class InvokerResourceMessage(status: String,
+                                  freeMemory: Long,
+                                  busyMemory: Long,
+                                  inProgressMemory: Long,
+                                  tags: Seq[String],

Review comment:
       BTW, dedicatedNamespaces means dedicatedNamespaces's all actions run on 
corresponding invoker.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to