style95 commented on a change in pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#discussion_r574196400



##########
File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,313 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message 
processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, 
watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = 
true)
+case class DeRegisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, 
Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In case there is any issue occurred while storing data, it keeps trying 
until the data is stored.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: 
ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the 
inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free 
memories
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")

Review comment:
       In this case, it stores the request into a buffer because there is 
already precedent request processing. If any issue happens it would let us know 
if the request has processed or not.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to