vvraskin commented on a change in pull request #2531: Use akka distributed map 
to store the shared state
URL: 
https://github.com/apache/incubator-openwhisk/pull/2531#discussion_r137289493
 
 

 ##########
 File path: 
core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
 ##########
 @@ -17,67 +17,74 @@
 
 package whisk.core.loadBalancer
 
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.Promise
-
-import whisk.core.entity.{ ActivationId, UUID, WhiskActivation }
+import akka.actor.ActorSystem
+import akka.util.Timeout
+import akka.pattern.ask
+import akka.event.Logging
+import whisk.core.entity.{ActivationId, UUID, WhiskActivation}
 import whisk.core.entity.InstanceId
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.{Future, Promise}
+import scala.concurrent.duration._
+import scala.concurrent.ExecutionContext.Implicits.global
 
 /** Encapsulates data relevant for a single activation */
 case class ActivationEntry(id: ActivationId, namespaceId: UUID, invokerName: 
InstanceId, promise: Promise[Either[ActivationId, WhiskActivation]])
 
 /**
  * Encapsulates data used for loadbalancer and active-ack bookkeeping.
  *
- * Note: The state keeping is backed by concurrent data-structures. As such,
- * concurrent reads can return stale values (especially the counters returned).
+ * Note: The state keeping is backed by distributed akka actors. All CRUDs 
operations are done on local values, thus
+  * a stale value might be read.
  */
-class LoadBalancerData() {
+class LoadBalancerData(implicit val actorSystem :ActorSystem) {
 
-    private val activationByInvoker = TrieMap[InstanceId, AtomicInteger]()
-    private val activationByNamespaceId = TrieMap[UUID, AtomicInteger]()
+    implicit val timeout = Timeout(5.seconds)
     private val activationsById = TrieMap[ActivationId, ActivationEntry]()
-    private val totalActivations = new AtomicInteger(0)
+    private val sharedStateInvokers = 
actorSystem.actorOf(SharedDataService.props("Invokers"), name =
+      "SharedDataServiceInvokers"+UUID())
+    private val sharedStateNamespaces = 
actorSystem.actorOf(SharedDataService.props("Namespaces"), name =
+      "SharedDataServiceNamespaces"+UUID())
+
+    val logging = Logging(actorSystem, sharedStateInvokers)
 
     /** Get the number of activations across all namespaces. */
-    def totalActivationCount = totalActivations.get
+    def totalActivationCount = activationsById.size
 
     /**
      * Get the number of activations for a specific namespace.
      *
      * @param namespace The namespace to get the activation count for
      * @return a map (namespace -> number of activations in the system)
      */
-    def activationCountOn(namespace: UUID) = {
-        activationByNamespaceId.get(namespace).map(_.get).getOrElse(0)
+    def activationCountOn(namespace: UUID): Future[Int] = {
+        (sharedStateNamespaces ? 
GetTheMap()).mapTo[MapWithCounters].map(_.dataMap.mapValues(_.toInt).getOrElse
+        (namespace.toString, 0))
     }
 
     /**
-     * Get the number of activations for a specific invoker.
+     * Get the number of activations for each invoker.
      *
-     * @param invoker The invoker to get the activation count for
      * @return a map (invoker -> number of activations queued for the invoker)
      */
-    def activationCountOn(invoker: InstanceId): Int = {
-        activationByInvoker.get(invoker).map(_.get).getOrElse(0)
+    def activationCountPerInvoker: Future[Map[String, BigInt]] = {
 
 Review comment:
   Unfortunately with current akka implementation of PNCounterMap we can only 
get the whole map, which is ok here since we get it from the local provider (no 
lookup from other cluster nodes is needed). In theory I can leave the method as 
it was returning activations per invoker, but I consider it as too much 
overhead to ask the Actor several times for the same map. 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to