cbickel commented on a change in pull request #2531: Use akka distributed map 
to store the shared state
URL: 
https://github.com/apache/incubator-openwhisk/pull/2531#discussion_r137241138
 
 

 ##########
 File path: 
core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
 ##########
 @@ -17,67 +17,74 @@
 
 package whisk.core.loadBalancer
 
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.Promise
-
-import whisk.core.entity.{ ActivationId, UUID, WhiskActivation }
+import akka.actor.ActorSystem
+import akka.util.Timeout
+import akka.pattern.ask
+import akka.event.Logging
+import whisk.core.entity.{ActivationId, UUID, WhiskActivation}
 import whisk.core.entity.InstanceId
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.{Future, Promise}
+import scala.concurrent.duration._
+import scala.concurrent.ExecutionContext.Implicits.global
 
 /** Encapsulates data relevant for a single activation */
 case class ActivationEntry(id: ActivationId, namespaceId: UUID, invokerName: 
InstanceId, promise: Promise[Either[ActivationId, WhiskActivation]])
 
 /**
  * Encapsulates data used for loadbalancer and active-ack bookkeeping.
  *
- * Note: The state keeping is backed by concurrent data-structures. As such,
- * concurrent reads can return stale values (especially the counters returned).
+ * Note: The state keeping is backed by distributed akka actors. All CRUDs 
operations are done on local values, thus
+  * a stale value might be read.
  */
-class LoadBalancerData() {
+class LoadBalancerData(implicit val actorSystem :ActorSystem) {
 
-    private val activationByInvoker = TrieMap[InstanceId, AtomicInteger]()
-    private val activationByNamespaceId = TrieMap[UUID, AtomicInteger]()
+    implicit val timeout = Timeout(5.seconds)
     private val activationsById = TrieMap[ActivationId, ActivationEntry]()
-    private val totalActivations = new AtomicInteger(0)
+    private val sharedStateInvokers = 
actorSystem.actorOf(SharedDataService.props("Invokers"), name =
+      "SharedDataServiceInvokers"+UUID())
+    private val sharedStateNamespaces = 
actorSystem.actorOf(SharedDataService.props("Namespaces"), name =
+      "SharedDataServiceNamespaces"+UUID())
+
+    val logging = Logging(actorSystem, sharedStateInvokers)
 
     /** Get the number of activations across all namespaces. */
-    def totalActivationCount = totalActivations.get
+    def totalActivationCount = activationsById.size
 
 Review comment:
   These are only the activations of the current Controller, or is this local 
map updated somewhere else?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to