bdoyle0182 commented on a change in pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#discussion_r574209680



##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message 
processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, 
watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = 
true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, 
Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying 
until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: 
ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the 
inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free 
memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")

Review comment:
       So I'm still learning what this is doing, but what does each request 
involve? Is it every activation or some sort of metadata setup? If it's every 
activation it would seem spammy to me otherwise I think it's fine




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to