markusthoemmes commented on a change in pull request #2531: Use akka 
distributed map to store the shared state
URL: 
https://github.com/apache/incubator-openwhisk/pull/2531#discussion_r129620993
 
 

 ##########
 File path: 
core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
 ##########
 @@ -31,82 +38,77 @@ case class ActivationEntry(id: ActivationId, namespaceId: 
UUID, invokerName: Ins
  * Note: The state keeping is backed by concurrent data-structures. As such,
  * concurrent reads can return stale values (especially the counters returned).
  */
-class LoadBalancerData() {
+class LoadBalancerData(invokersMapName :String, namespacesMapName :String, 
implicit val actorSystem :ActorSystem) {
 
-    type TrieSet[T] = TrieMap[T, Unit]
+  type TrieSet[T] = TrieMap[T, Unit]
 
-    private val activationByInvoker = new TrieMap[InstanceId, 
TrieSet[ActivationEntry]]
-    private val activationByNamespaceId = new TrieMap[UUID, 
TrieSet[ActivationEntry]]
-    private val activationsById = new TrieMap[ActivationId, ActivationEntry]
+  private val activationsById = new TrieMap[ActivationId, ActivationEntry]
 
-    /**
-     * Get the number of activations for each namespace.
-     *
-     * @return a map (namespace -> number of activations in the system)
-     */
-    def activationCountByNamespace: Map[UUID, Int] = {
-        activationByNamespaceId.toMap.mapValues(_.size)
-    }
+  val props = Props[SharedDataService]
 
-    /**
-     * Get the number of activations for each invoker.
-     *
-     * @return a map (invoker -> number of activations queued for the invoker)
-     */
-    def activationCountByInvoker: Map[InstanceId, Int] = {
-        activationByInvoker.toMap.mapValues(_.size)
-    }
+  implicit val timeout = Timeout(5.seconds)
 
-    /**
-     * Get an activation entry for a given activation id.
-     *
-     * @param activationId activation id to get data for
-     * @return the respective activation or None if it doesn't exist
-     */
-    def activationById(activationId: ActivationId): Option[ActivationEntry] = {
-        activationsById.get(activationId)
-    }
+  val sharedStateInvokers = actorSystem.actorOf(props, name = invokersMapName)
+  val sharedStateNamespaces = actorSystem.actorOf(props, name = 
namespacesMapName)
 
-    /**
-     * Adds an activation entry.
-     *
-     * @param id identifier to deduplicate the entry
-     * @param update block calculating the entry to add.
-     *               Note: This is evaluated iff the entry
-     *               didn't exist before.
-     * @return the entry calculated by the block or iff it did
-     *         exist before the entry from the state
-     */
-    def putActivation(id: ActivationId, update: => ActivationEntry): 
ActivationEntry = {
-        activationsById.getOrElseUpdate(id, {
-            val entry = update
-            activationByNamespaceId.getOrElseUpdate(entry.namespaceId, new 
TrieSet[ActivationEntry]).put(entry, {})
-            activationByInvoker.getOrElseUpdate(entry.invokerName, new 
TrieSet[ActivationEntry]).put(entry, {})
-            entry
-        })
-    }
+  /**
+    * Get the number of activations for each namespace.
+    *
+    * @return a map (namespace -> number of activations in the system)
+    */
+  def activationCountByNamespace: Future[Map[String, BigInt]] = {
+    (sharedStateNamespaces ? GetTheMap()).mapTo[MapWithCounters].map(_.dataMap)
+  }
 
-    /**
-     * Removes the given entry.
-     *
-     * @param entry the entry to remove
-     * @return The deleted entry or None if nothing got deleted
-     */
-    def removeActivation(entry: ActivationEntry): Option[ActivationEntry] = {
-        activationsById.remove(entry.id).map { x =>
-            activationByNamespaceId.getOrElseUpdate(x.namespaceId, new 
TrieSet[ActivationEntry]).remove(entry)
-            activationByInvoker.getOrElseUpdate(x.invokerName, new 
TrieSet[ActivationEntry]).remove(entry)
-            x
-        }
-    }
+  /**
+    * Get the number of activations for each invoker.
+    *
+    * @return a map (invoker -> number of activations queued for the invoker)
+    */
+  def activationCountByInvoker: Future[Map[String, BigInt]] = {
+    (sharedStateInvokers ? GetTheMap()).mapTo[MapWithCounters].map(_.dataMap)
+  }
 
-    /**
-     * Removes the activation identified by the given activation id.
-     *
-     * @param aid activation id to remove
-     * @return The deleted entry or None if nothing got deleted
-     */
-    def removeActivation(aid: ActivationId): Option[ActivationEntry] = {
-        activationsById.get(aid).flatMap(removeActivation)
+  /**
+    * Adds an activation entry.
+    *
+    * @param id     identifier to deduplicate the entry
+    * @param update block calculating the entry to add.
+    *               Note: This is evaluated iff the entry
+    *               didn't exist before.
+    * @return the entry calculated by the block or iff it did
+    *         exist before the entry from the state
+    */
+  def putActivation(id: ActivationId, update: => ActivationEntry): 
ActivationEntry = {
+    activationsById.getOrElseUpdate(id, {
+      val entry = update
+      sharedStateNamespaces ! IncreaseCounter(entry.namespaceId.asString, 1)
 
 Review comment:
   1. Loadbalancer tracks concurrent invocation of namespaces (needed for 
throttling)
   2. Loadbalancer tracks load per invoker to balance the load with the least 
queuing possible.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to