cbickel commented on a change in pull request #2531: Use akka distributed map
to store the shared state
URL:
https://github.com/apache/incubator-openwhisk/pull/2531#discussion_r137241574
##########
File path:
core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerData.scala
##########
@@ -17,67 +17,74 @@
package whisk.core.loadBalancer
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.Promise
-
-import whisk.core.entity.{ ActivationId, UUID, WhiskActivation }
+import akka.actor.ActorSystem
+import akka.util.Timeout
+import akka.pattern.ask
+import akka.event.Logging
+import whisk.core.entity.{ActivationId, UUID, WhiskActivation}
import whisk.core.entity.InstanceId
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.{Future, Promise}
+import scala.concurrent.duration._
+import scala.concurrent.ExecutionContext.Implicits.global
/** Encapsulates data relevant for a single activation */
case class ActivationEntry(id: ActivationId, namespaceId: UUID, invokerName:
InstanceId, promise: Promise[Either[ActivationId, WhiskActivation]])
/**
* Encapsulates data used for loadbalancer and active-ack bookkeeping.
*
- * Note: The state keeping is backed by concurrent data-structures. As such,
- * concurrent reads can return stale values (especially the counters returned).
+ * Note: The state keeping is backed by distributed akka actors. All CRUDs
operations are done on local values, thus
+ * a stale value might be read.
*/
-class LoadBalancerData() {
+class LoadBalancerData(implicit val actorSystem :ActorSystem) {
- private val activationByInvoker = TrieMap[InstanceId, AtomicInteger]()
- private val activationByNamespaceId = TrieMap[UUID, AtomicInteger]()
+ implicit val timeout = Timeout(5.seconds)
private val activationsById = TrieMap[ActivationId, ActivationEntry]()
- private val totalActivations = new AtomicInteger(0)
+ private val sharedStateInvokers =
actorSystem.actorOf(SharedDataService.props("Invokers"), name =
+ "SharedDataServiceInvokers"+UUID())
+ private val sharedStateNamespaces =
actorSystem.actorOf(SharedDataService.props("Namespaces"), name =
+ "SharedDataServiceNamespaces"+UUID())
+
+ val logging = Logging(actorSystem, sharedStateInvokers)
/** Get the number of activations across all namespaces. */
- def totalActivationCount = totalActivations.get
+ def totalActivationCount = activationsById.size
/**
* Get the number of activations for a specific namespace.
*
* @param namespace The namespace to get the activation count for
* @return a map (namespace -> number of activations in the system)
Review comment:
doesn't return a map. we should update this doc.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services