markusthoemmes commented on a change in pull request #2531: Share bookkeeping 
data across controllers
URL: 
https://github.com/apache/incubator-openwhisk/pull/2531#discussion_r138024809
 
 

 ##########
 File path: 
core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
 ##########
 @@ -17,107 +17,130 @@
 
 package whisk.core.loadBalancer
 
-import java.util.concurrent.atomic.AtomicInteger
-
+import akka.actor.ActorSystem
+import akka.util.Timeout
+import akka.pattern.ask
+import akka.event.Logging
+import whisk.core.entity.{ActivationId, UUID}
 import scala.collection.concurrent.TrieMap
-import scala.concurrent.Promise
-
-import whisk.core.entity.{ActivationId, UUID, WhiskActivation}
-import whisk.core.entity.InstanceId
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.concurrent.ExecutionContext.Implicits.global
 
-/** Encapsulates data relevant for a single activation */
-case class ActivationEntry(id: ActivationId,
-                           namespaceId: UUID,
-                           invokerName: InstanceId,
-                           promise: Promise[Either[ActivationId, 
WhiskActivation]])
-
-/**
- * Encapsulates data used for loadbalancer and active-ack bookkeeping.
- *
- * Note: The state keeping is backed by concurrent data-structures. As such,
- * concurrent reads can return stale values (especially the counters returned).
- */
-class LoadBalancerData() {
-
-  private val activationByInvoker = TrieMap[InstanceId, AtomicInteger]()
-  private val activationByNamespaceId = TrieMap[UUID, AtomicInteger]()
-  private val activationsById = TrieMap[ActivationId, ActivationEntry]()
-  private val totalActivations = new AtomicInteger(0)
+trait BookkeepingData {
 
   /** Get the number of activations across all namespaces. */
-  def totalActivationCount = totalActivations.get
+  def totalActivationCount: Future[Int]
 
   /**
    * Get the number of activations for a specific namespace.
    *
    * @param namespace The namespace to get the activation count for
    * @return a map (namespace -> number of activations in the system)
    */
-  def activationCountOn(namespace: UUID) = {
-    activationByNamespaceId.get(namespace).map(_.get).getOrElse(0)
-  }
+  def activationCountOn(namespace: UUID): Future[Int]
 
   /**
-   * Get the number of activations for a specific invoker.
+   * Get the number of activations for each invoker.
    *
-   * @param invoker The invoker to get the activation count for
    * @return a map (invoker -> number of activations queued for the invoker)
    */
-  def activationCountOn(invoker: InstanceId): Int = {
-    activationByInvoker.get(invoker).map(_.get).getOrElse(0)
-  }
+  def activationCountPerInvoker: Future[Map[String, Int]]
 
   /**
    * Get an activation entry for a given activation id.
    *
    * @param activationId activation id to get data for
    * @return the respective activation or None if it doesn't exist
    */
-  def activationById(activationId: ActivationId): Option[ActivationEntry] = {
-    activationsById.get(activationId)
-  }
+  def activationById(activationId: ActivationId): Option[ActivationEntry]
 
   /**
    * Adds an activation entry.
    *
-   * @param id identifier to deduplicate the entry
+   * @param id     identifier to deduplicate the entry
    * @param update block calculating the entry to add.
    *               Note: This is evaluated iff the entry
    *               didn't exist before.
    * @return the entry calculated by the block or iff it did
    *         exist before the entry from the state
    */
-  def putActivation(id: ActivationId, update: => ActivationEntry): 
ActivationEntry = {
-    activationsById.getOrElseUpdate(id, {
-      val entry = update
-      totalActivations.incrementAndGet()
-      activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new 
AtomicInteger(0)).incrementAndGet()
-      activationByInvoker.getOrElseUpdate(entry.invokerName, new 
AtomicInteger(0)).incrementAndGet()
-      entry
-    })
-  }
+  def putActivation(id: ActivationId, update: => ActivationEntry): 
ActivationEntry
 
   /**
    * Removes the given entry.
    *
    * @param entry the entry to remove
    * @return The deleted entry or None if nothing got deleted
    */
-  def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
-    activationsById.remove(entry.id).map { x =>
-      totalActivations.decrementAndGet()
-      activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new 
AtomicInteger(0)).decrementAndGet()
-      activationByInvoker.getOrElseUpdate(entry.invokerName, new 
AtomicInteger(0)).decrementAndGet()
-      x
-    }
-  }
+  def removeActivation(entry: ActivationEntry): Option[ActivationEntry]
 
   /**
    * Removes the activation identified by the given activation id.
    *
    * @param aid activation id to remove
    * @return The deleted entry or None if nothing got deleted
    */
+  def removeActivation(aid: ActivationId): Option[ActivationEntry]
+}
+
+/**
+ * Encapsulates data used for loadbalancer and active-ack bookkeeping.
+ *
+ * Note: The state keeping is backed by distributed akka actors. All CRUDs 
operations are done on local values, thus
+ * a stale value might be read.
+ */
+class LoadBalancerData(implicit val actorSystem: ActorSystem) extends 
BookkeepingData {
 
 Review comment:
   Can we encapsulate this in its own file and name it 
`LoadBalancerDataDistributed`. (Or adopt the more common naming convention 
"specialization attribute up front" like "DistributedLoadBalancerData")
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to