style95 commented on a change in pull request #5063:
URL: https://github.com/apache/openwhisk/pull/5063#discussion_r574209393
##########
File path:
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message
processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef,
watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+ value: String,
+ failoverEnabled: Boolean = true,
+ recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean =
true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist,
Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying
until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory:
ActorRefFactory => ActorRef)(
+ implicit logging: Logging,
+ actorSystem: ActorSystem)
+ extends Actor {
+ private implicit val ec = context.dispatcher
+
+ implicit val requestTimeout: Timeout = Timeout(5.seconds)
+ private[service] val dataCache = TrieMap[String, String]()
+ private val operations = Map.empty[String, Queue[Any]]
+ private var inProgressKeys = Set.empty[String]
+ private val watcherName = "data-management-service"
+
+ private val worker = workerFactory(context)
+
+ override def receive: Receive = {
+ case FinishWork(key) =>
+ // send waiting operation to worker if there is any, else update the
inProgressKeys
+ val ops = operations.get(key)
+ if (ops.nonEmpty && ops.get.nonEmpty) {
+ val operation = ops.get.dequeue()
+ worker ! operation
+ } else {
+ inProgressKeys = inProgressKeys - key
+ operations.remove(key) // remove empty queue from the map to free
memory
+ }
+
+ // normally these messages will be sent when queues are created.
+ case request: ElectLeader =>
Review comment:
Leader election happens when a queue is created.
This is to guarantee only one scheduler creates a certain queue.
So it happens relatively fewer times.
##########
File path:
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message
processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef,
watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+ value: String,
+ failoverEnabled: Boolean = true,
+ recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean =
true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist,
Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying
until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory:
ActorRefFactory => ActorRef)(
+ implicit logging: Logging,
+ actorSystem: ActorSystem)
+ extends Actor {
+ private implicit val ec = context.dispatcher
+
+ implicit val requestTimeout: Timeout = Timeout(5.seconds)
+ private[service] val dataCache = TrieMap[String, String]()
+ private val operations = Map.empty[String, Queue[Any]]
+ private var inProgressKeys = Set.empty[String]
+ private val watcherName = "data-management-service"
+
+ private val worker = workerFactory(context)
+
+ override def receive: Receive = {
+ case FinishWork(key) =>
+ // send waiting operation to worker if there is any, else update the
inProgressKeys
+ val ops = operations.get(key)
+ if (ops.nonEmpty && ops.get.nonEmpty) {
+ val operation = ops.get.dequeue()
+ worker ! operation
+ } else {
+ inProgressKeys = inProgressKeys - key
+ operations.remove(key) // remove empty queue from the map to free
memory
+ }
+
+ // normally these messages will be sent when queues are created.
+ case request: ElectLeader =>
+ if (inProgressKeys.contains(request.key)) {
Review comment:
With the retry nature of this component, if there is a precedent
request(being retried), it would store the new request to a buffer.
##########
File path:
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message
processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef,
watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+ value: String,
+ failoverEnabled: Boolean = true,
+ recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean =
true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist,
Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying
until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory:
ActorRefFactory => ActorRef)(
+ implicit logging: Logging,
+ actorSystem: ActorSystem)
+ extends Actor {
+ private implicit val ec = context.dispatcher
+
+ implicit val requestTimeout: Timeout = Timeout(5.seconds)
+ private[service] val dataCache = TrieMap[String, String]()
+ private val operations = Map.empty[String, Queue[Any]]
+ private var inProgressKeys = Set.empty[String]
+ private val watcherName = "data-management-service"
+
+ private val worker = workerFactory(context)
+
+ override def receive: Receive = {
+ case FinishWork(key) =>
+ // send waiting operation to worker if there is any, else update the
inProgressKeys
+ val ops = operations.get(key)
+ if (ops.nonEmpty && ops.get.nonEmpty) {
+ val operation = ops.get.dequeue()
+ worker ! operation
+ } else {
+ inProgressKeys = inProgressKeys - key
+ operations.remove(key) // remove empty queue from the map to free
memory
+ }
+
+ // normally these messages will be sent when queues are created.
+ case request: ElectLeader =>
+ if (inProgressKeys.contains(request.key)) {
+ logging.info(this, s"save a request $request into a buffer")
+ operations.getOrElseUpdate(request.key,
Queue.empty[Any]).enqueue(request)
+ } else {
+ worker ! request
+ inProgressKeys = inProgressKeys + request.key
+ }
+
+ case request: RegisterInitialData =>
+ // send WatchEndpoint first as the put operation will be retried until
success if failed
+ if (request.failoverEnabled)
+ watcherService ! WatchEndpoint(request.key, request.value, isPrefix =
false, watcherName, Set(DeleteEvent))
+ if (inProgressKeys.contains(request.key)) {
+ logging.info(this, s"save request $request into a buffer")
+ operations.getOrElseUpdate(request.key,
Queue.empty[Any]).enqueue(request)
+ } else {
+ worker ! request
+ inProgressKeys = inProgressKeys + request.key
+ }
+
+ case request: RegisterData =>
+ // send WatchEndpoint first as the put operation will be retried until
success if failed
+ if (request.failoverEnabled)
+ watcherService ! WatchEndpoint(request.key, request.value, isPrefix =
false, watcherName, Set(DeleteEvent))
+ if (inProgressKeys.contains(request.key)) {
+ // the new put|delete operation will erase influences made by older
operations like put&delete
+ // so we can remove these old operations
+ logging.info(this, s"save request $request into a buffer")
+ val queue = operations.getOrElseUpdate(request.key,
Queue.empty[Any]).filter { value =>
+ value match {
+ case _: RegisterData | _: WatcherClosed | _: RegisterInitialData
=> false
+ case _
=> true
+ }
+ }
+ queue.enqueue(request)
+ operations.update(request.key, queue)
+ } else {
+ worker ! request
+ inProgressKeys = inProgressKeys + request.key
+ }
+
+ case request: WatcherClosed =>
+ if (inProgressKeys.contains(request.key)) {
+ // The put|delete operations against the same key will overwrite the
previous results.
+ // For example, if we put a value, delete it and put a new value
again, the final result will be the new value.
+ // So we can remove these old operations
+ logging.info(this, s"save request $request into a buffer")
+ val queue = operations.getOrElseUpdate(request.key,
Queue.empty[Any]).filter { value =>
+ value match {
+ case _: RegisterData | _: WatcherClosed | _: RegisterInitialData
=> false
+ case _
=> true
+ }
+ }
+ queue.enqueue(request)
+ operations.update(request.key, queue)
+ } else {
+ worker ! request
+ inProgressKeys = inProgressKeys + request.key
+ }
+
+ // It is required to close the watcher first before deleting etcd data
+ // It is supposed to receive the WatcherClosed message after the watcher
is stopped.
+ case msg: UnregisterData =>
+ watcherService ! UnwatchEndpoint(msg.key, isPrefix = false, watcherName,
needFeedback = true)
+
+ case WatchEndpointRemoved(_, key, value, false) =>
+ self ! RegisterInitialData(key, value, failoverEnabled = false) // the
watcher is already setup
+
+ // It should not receive "prefixed" data
+ case WatchEndpointRemoved(_, key, value, true) =>
+ logging.error(this, s"unexpected data received: ${WatchEndpoint(key,
value, isPrefix = true, watcherName)}")
+
+ case msg: UpdateDataOnChange =>
Review comment:
To reduce the loads against ETCD, it does not store data if there is no
change in the value.
##########
File path:
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message
processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef,
watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+ value: String,
+ failoverEnabled: Boolean = true,
+ recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean =
true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist,
Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying
until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory:
ActorRefFactory => ActorRef)(
+ implicit logging: Logging,
+ actorSystem: ActorSystem)
+ extends Actor {
+ private implicit val ec = context.dispatcher
+
+ implicit val requestTimeout: Timeout = Timeout(5.seconds)
+ private[service] val dataCache = TrieMap[String, String]()
+ private val operations = Map.empty[String, Queue[Any]]
+ private var inProgressKeys = Set.empty[String]
+ private val watcherName = "data-management-service"
+
+ private val worker = workerFactory(context)
+
+ override def receive: Receive = {
+ case FinishWork(key) =>
+ // send waiting operation to worker if there is any, else update the
inProgressKeys
+ val ops = operations.get(key)
+ if (ops.nonEmpty && ops.get.nonEmpty) {
+ val operation = ops.get.dequeue()
+ worker ! operation
+ } else {
+ inProgressKeys = inProgressKeys - key
+ operations.remove(key) // remove empty queue from the map to free
memory
+ }
+
+ // normally these messages will be sent when queues are created.
+ case request: ElectLeader =>
+ if (inProgressKeys.contains(request.key)) {
+ logging.info(this, s"save a request $request into a buffer")
+ operations.getOrElseUpdate(request.key,
Queue.empty[Any]).enqueue(request)
+ } else {
+ worker ! request
+ inProgressKeys = inProgressKeys + request.key
+ }
+
+ case request: RegisterInitialData =>
+ // send WatchEndpoint first as the put operation will be retried until
success if failed
+ if (request.failoverEnabled)
Review comment:
If the failover is enabled, it would watch the key and if the key is
deleted for some reason, it would try to restore it.
##########
File path:
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message
processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef,
watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+ value: String,
+ failoverEnabled: Boolean = true,
+ recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean =
true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist,
Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying
until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory:
ActorRefFactory => ActorRef)(
+ implicit logging: Logging,
+ actorSystem: ActorSystem)
+ extends Actor {
+ private implicit val ec = context.dispatcher
+
+ implicit val requestTimeout: Timeout = Timeout(5.seconds)
+ private[service] val dataCache = TrieMap[String, String]()
+ private val operations = Map.empty[String, Queue[Any]]
+ private var inProgressKeys = Set.empty[String]
+ private val watcherName = "data-management-service"
+
+ private val worker = workerFactory(context)
+
+ override def receive: Receive = {
+ case FinishWork(key) =>
+ // send waiting operation to worker if there is any, else update the
inProgressKeys
+ val ops = operations.get(key)
+ if (ops.nonEmpty && ops.get.nonEmpty) {
+ val operation = ops.get.dequeue()
+ worker ! operation
+ } else {
+ inProgressKeys = inProgressKeys - key
+ operations.remove(key) // remove empty queue from the map to free
memory
+ }
+
+ // normally these messages will be sent when queues are created.
+ case request: ElectLeader =>
+ if (inProgressKeys.contains(request.key)) {
+ logging.info(this, s"save a request $request into a buffer")
+ operations.getOrElseUpdate(request.key,
Queue.empty[Any]).enqueue(request)
+ } else {
+ worker ! request
+ inProgressKeys = inProgressKeys + request.key
+ }
+
+ case request: RegisterInitialData =>
Review comment:
Actions under the same namespace share some data such as namespace
throttling data.
So it is required to store the data if there is no data yet but not
overwrite an existing one.
This case is for the case.
##########
File path:
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message
processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef,
watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+ value: String,
+ failoverEnabled: Boolean = true,
+ recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean =
true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist,
Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying
until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory:
ActorRefFactory => ActorRef)(
+ implicit logging: Logging,
+ actorSystem: ActorSystem)
+ extends Actor {
+ private implicit val ec = context.dispatcher
+
+ implicit val requestTimeout: Timeout = Timeout(5.seconds)
+ private[service] val dataCache = TrieMap[String, String]()
+ private val operations = Map.empty[String, Queue[Any]]
+ private var inProgressKeys = Set.empty[String]
+ private val watcherName = "data-management-service"
+
+ private val worker = workerFactory(context)
+
+ override def receive: Receive = {
+ case FinishWork(key) =>
+ // send waiting operation to worker if there is any, else update the
inProgressKeys
+ val ops = operations.get(key)
+ if (ops.nonEmpty && ops.get.nonEmpty) {
+ val operation = ops.get.dequeue()
+ worker ! operation
+ } else {
+ inProgressKeys = inProgressKeys - key
+ operations.remove(key) // remove empty queue from the map to free
memory
+ }
+
+ // normally these messages will be sent when queues are created.
+ case request: ElectLeader =>
+ if (inProgressKeys.contains(request.key)) {
+ logging.info(this, s"save a request $request into a buffer")
+ operations.getOrElseUpdate(request.key,
Queue.empty[Any]).enqueue(request)
+ } else {
+ worker ! request
Review comment:
Actual works would be delegated to ETCDWorker.
##########
File path:
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message
processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef,
watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+ value: String,
+ failoverEnabled: Boolean = true,
+ recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean =
true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist,
Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying
until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory:
ActorRefFactory => ActorRef)(
+ implicit logging: Logging,
+ actorSystem: ActorSystem)
+ extends Actor {
+ private implicit val ec = context.dispatcher
+
Review comment:
This class is used by both schedulers and invokers to store data to ETCD.
The following kinds of data are stored to ETCD.
1. Throttling data(Action / Namespace)
2. Queue endpoint(where a queue is running)
3. Scheduler endpoint.
4. Container data(running container, warmed container, data to describe how
many containers are being created)
Dependent modules are Queue, ContainerProxy, CreationJobManager, etc.
##########
File path:
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
##########
@@ -0,0 +1,311 @@
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message
processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef,
watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+ value: String,
+ failoverEnabled: Boolean = true,
+ recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean =
true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist,
Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying
until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory:
ActorRefFactory => ActorRef)(
+ implicit logging: Logging,
+ actorSystem: ActorSystem)
+ extends Actor {
+ private implicit val ec = context.dispatcher
+
+ implicit val requestTimeout: Timeout = Timeout(5.seconds)
+ private[service] val dataCache = TrieMap[String, String]()
+ private val operations = Map.empty[String, Queue[Any]]
+ private var inProgressKeys = Set.empty[String]
+ private val watcherName = "data-management-service"
+
+ private val worker = workerFactory(context)
+
+ override def receive: Receive = {
+ case FinishWork(key) =>
+ // send waiting operation to worker if there is any, else update the
inProgressKeys
+ val ops = operations.get(key)
+ if (ops.nonEmpty && ops.get.nonEmpty) {
+ val operation = ops.get.dequeue()
+ worker ! operation
+ } else {
+ inProgressKeys = inProgressKeys - key
+ operations.remove(key) // remove empty queue from the map to free
memory
+ }
+
+ // normally these messages will be sent when queues are created.
+ case request: ElectLeader =>
+ if (inProgressKeys.contains(request.key)) {
+ logging.info(this, s"save a request $request into a buffer")
+ operations.getOrElseUpdate(request.key,
Queue.empty[Any]).enqueue(request)
+ } else {
+ worker ! request
+ inProgressKeys = inProgressKeys + request.key
+ }
+
+ case request: RegisterInitialData =>
+ // send WatchEndpoint first as the put operation will be retried until
success if failed
+ if (request.failoverEnabled)
+ watcherService ! WatchEndpoint(request.key, request.value, isPrefix =
false, watcherName, Set(DeleteEvent))
+ if (inProgressKeys.contains(request.key)) {
+ logging.info(this, s"save request $request into a buffer")
+ operations.getOrElseUpdate(request.key,
Queue.empty[Any]).enqueue(request)
+ } else {
+ worker ! request
+ inProgressKeys = inProgressKeys + request.key
+ }
+
+ case request: RegisterData =>
Review comment:
This will overwrite the existing data in ETCD.
Generally, this is used for data that is not shared among actions.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]