This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new ecb1509 [New Scheduler] Add DataManagementService (#5063)
ecb1509 is described below
commit ecb15098caded058ddb6976c630f5b6dcd656177
Author: Dominic Kim <[email protected]>
AuthorDate: Fri Mar 19 07:36:56 2021 +0900
[New Scheduler] Add DataManagementService (#5063)
* Add DataManagementService
* Update
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
Apply comment
Co-authored-by: Brendan Doyle <[email protected]>
* Apply suggestions from code review
Update comments
Co-authored-by: Brendan Doyle <[email protected]>
* Apply comments
* Add unit tests for DataManagementService
* Remove unused variable
* Add the license header
* Change Lease
* Pull docker image for the api gateway in advance
Co-authored-by: Brendan Doyle <[email protected]>
---
ansible/group_vars/all | 5 +
.../org/apache/openwhisk/core/WhiskConfig.scala | 2 +
.../apache/openwhisk/core/etcd/EtcdClient.scala | 3 +-
.../core/service/DataManagementService.scala | 328 +++++++++++++++++++++
.../openwhisk/core/service/WatcherService.scala | 1 +
.../common/etcd/EtcdLeaderShipUnitTests.scala | 3 +-
.../core/service/DataManagementServiceTests.scala | 287 ++++++++++++++++++
tools/travis/runStandaloneTests.sh | 1 +
8 files changed, 627 insertions(+), 3 deletions(-)
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 555ad88..bcaba7a 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -450,3 +450,8 @@ etcd_connect_string: "{% set ret = [] %}\
{{ ret.append( hostvars[host].ansible_host + ':' +
((etcd.client.port+loop.index-1)|string) ) }}\
{% endfor %}\
{{ ret | join(',') }}"
+
+scheduler:
+ dataManagementService:
+ retryInterval: "{{ scheduler_dataManagementService_retryInterval |
default(1 second) }}"
+
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 67cf783..19ad39d 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -294,4 +294,6 @@ object ConfigKeys {
val schedulerMaxPeek = "whisk.scheduler.max-peek"
val whiskClusterName = "whisk.cluster.name"
+
+ val dataManagementServiceRetryInterval =
"whisk.scheduler.data-management-service.retryInterval"
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala
index 1142747..6d16dad 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala
@@ -27,6 +27,7 @@ import java.util.concurrent.Executors
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.etcd.EtcdType._
+import org.apache.openwhisk.core.service.Lease
import pureconfig.loadConfigOrThrow
import spray.json.DefaultJsonProtocol
@@ -34,8 +35,6 @@ import scala.language.implicitConversions
import scala.annotation.tailrec
import scala.concurrent.{ExecutionContextExecutor, Future, Promise}
-case class Lease(id: Long, ttl: Long)
-
object RichListenableFuture {
implicit def convertToFuture[T](lf: ListenableFuture[T])(implicit ece:
ExecutionContextExecutor): Future[T] = {
val p = Promise[T]()
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
new file mode 100644
index 0000000..4258326
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message
processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef,
watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+ value: String,
+ failoverEnabled: Boolean = true,
+ recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean =
true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist,
Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying
until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory:
ActorRefFactory => ActorRef)(
+ implicit logging: Logging,
+ actorSystem: ActorSystem)
+ extends Actor {
+ private implicit val ec = context.dispatcher
+
+ implicit val requestTimeout: Timeout = Timeout(5.seconds)
+ private[service] val dataCache = TrieMap[String, String]()
+ private val operations = Map.empty[String, Queue[Any]]
+ private var inProgressKeys = Set.empty[String]
+ private val watcherName = "data-management-service"
+
+ private val worker = workerFactory(context)
+
+ override def receive: Receive = {
+ case FinishWork(key) =>
+ // send waiting operation to worker if there is any, else update the
inProgressKeys
+ val ops = operations.get(key)
+ if (ops.nonEmpty && ops.get.nonEmpty) {
+ val operation = ops.get.dequeue()
+ worker ! operation
+ } else {
+ inProgressKeys = inProgressKeys - key
+ operations.remove(key) // remove empty queue from the map to free
memory
+ }
+
+ // normally these messages will be sent when queues are created.
+ case request: ElectLeader =>
+ if (inProgressKeys.contains(request.key)) {
+ logging.info(this, s"save a request $request into a buffer")
+ operations.getOrElseUpdate(request.key,
Queue.empty[Any]).enqueue(request)
+ } else {
+ worker ! request
+ inProgressKeys = inProgressKeys + request.key
+ }
+
+ case request: RegisterInitialData =>
+ // send WatchEndpoint first as the put operation will be retried until
success if failed
+ if (request.failoverEnabled)
+ watcherService ! WatchEndpoint(request.key, request.value, isPrefix =
false, watcherName, Set(DeleteEvent))
+ if (inProgressKeys.contains(request.key)) {
+ logging.info(this, s"save request $request into a buffer")
+ operations.getOrElseUpdate(request.key,
Queue.empty[Any]).enqueue(request)
+ } else {
+ worker ! request
+ inProgressKeys = inProgressKeys + request.key
+ }
+
+ case request: RegisterData =>
+ // send WatchEndpoint first as the put operation will be retried until
success if failed
+ if (request.failoverEnabled)
+ watcherService ! WatchEndpoint(request.key, request.value, isPrefix =
false, watcherName, Set(DeleteEvent))
+ if (inProgressKeys.contains(request.key)) {
+ // the new put|delete operation will erase influences made by older
operations like put&delete
+ // so we can remove these old operations
+ logging.info(this, s"save request $request into a buffer")
+ val queue = operations.getOrElseUpdate(request.key,
Queue.empty[Any]).filter { value =>
+ value match {
+ case _: RegisterData | _: WatcherClosed | _: RegisterInitialData
=> false
+ case _
=> true
+ }
+ }
+ queue.enqueue(request)
+ operations.update(request.key, queue)
+ } else {
+ worker ! request
+ inProgressKeys = inProgressKeys + request.key
+ }
+
+ case request: WatcherClosed =>
+ if (inProgressKeys.contains(request.key)) {
+ // The put|delete operations against the same key will overwrite the
previous results.
+ // For example, if we put a value, delete it and put a new value
again, the final result will be the new value.
+ // So we can remove these old operations
+ logging.info(this, s"save request $request into a buffer")
+ val queue = operations.getOrElseUpdate(request.key,
Queue.empty[Any]).filter { value =>
+ value match {
+ case _: RegisterData | _: WatcherClosed | _: RegisterInitialData
=> false
+ case _
=> true
+ }
+ }
+ queue.enqueue(request)
+ operations.update(request.key, queue)
+ } else {
+ worker ! request
+ inProgressKeys = inProgressKeys + request.key
+ }
+
+ // It is required to close the watcher first before deleting etcd data
+ // It is supposed to receive the WatcherClosed message after the watcher
is stopped.
+ case msg: UnregisterData =>
+ watcherService ! UnwatchEndpoint(msg.key, isPrefix = false, watcherName,
needFeedback = true)
+
+ case WatchEndpointRemoved(_, key, value, false) =>
+ self ! RegisterInitialData(key, value, failoverEnabled = false) // the
watcher is already setup
+
+ // It should not receive "prefixed" data
+ case WatchEndpointRemoved(_, key, value, true) =>
+ logging.error(this, s"unexpected data received: ${WatchEndpoint(key,
value, isPrefix = true, watcherName)}")
+
+ case msg: UpdateDataOnChange =>
+ dataCache.get(msg.key) match {
+ case Some(cached) if cached == msg.value =>
+ logging.debug(this, s"skip publishing data ${msg.key} because the
data is not changed.")
+
+ case Some(cached) if cached != msg.value =>
+ dataCache.update(msg.key, msg.value)
+ self ! RegisterData(msg.key, msg.value, failoverEnabled = false) //
the watcher is already setup
+
+ case None =>
+ dataCache.put(msg.key, msg.value)
+ self ! RegisterData(msg.key, msg.value)
+
+ }
+ }
+}
+
+object DataManagementService {
+ val retryInterval: FiniteDuration =
loadConfigOrThrow[FiniteDuration](ConfigKeys.dataManagementServiceRetryInterval)
+
+ def props(watcherService: ActorRef, workerFactory: ActorRefFactory =>
ActorRef)(implicit logging: Logging,
+
actorSystem: ActorSystem): Props = {
+ Props(new DataManagementService(watcherService, workerFactory))
+ }
+}
+
+private[service] class EtcdWorker(etcdClient: EtcdClient, leaseService:
ActorRef)(implicit val ec: ExecutionContext,
+
actorSystem: ActorSystem,
+
logging: Logging)
+ extends Actor {
+
+ private val dataManagementService = context.parent
+ private var lease: Option[Lease] = None
+ leaseService ! GetLease
+
+ override def receive: Receive = {
+ case msg: Lease =>
+ lease = Some(msg)
+
+ // leader election + endpoint management
+ case request: ElectLeader =>
+ lease match {
+ case Some(l) =>
+ etcdClient
+ .electLeader(request.key, request.value, l)
+ .andThen {
+ case Success(msg) =>
+ request.recipient ! ElectionResult(msg)
+ dataManagementService ! FinishWork(request.key)
+ }
+ .recover {
+ // if there is no lease, reissue it and retry immediately
+ case t: StatusRuntimeException =>
+ logging.warn(this, s"a lease is expired while leader election,
reissue it: $t")
+ lease = None
+ leaseService ! GetLease
+ sendMessageToSelfAfter(request, retryInterval)
+
+ // it should retry forever until the data is stored
+ case t: Throwable =>
+ logging.warn(this, s"unexpected error happened: $t, retry
storing data")
+ sendMessageToSelfAfter(request, retryInterval)
+ }
+ case None =>
+ logging.warn(this, s"lease not found, retry storing data")
+ leaseService ! GetLease
+ sendMessageToSelfAfter(request, retryInterval)
+ }
+
+ // only endpoint management
+ case request: RegisterData =>
+ lease match {
+ case Some(l) =>
+ etcdClient
+ .put(request.key, request.value, l.id)
+ .andThen {
+ case Success(_) =>
+ dataManagementService ! FinishWork(request.key)
+ }
+ .recover {
+ // if there is no lease, reissue it and retry immediately
+ case t: StatusRuntimeException =>
+ logging.warn(this, s"a lease is expired while registering data
${request.key}, reissue it: $t")
+ lease = None
+ leaseService ! GetLease
+ sendMessageToSelfAfter(request, retryInterval)
+
+ // it should retry forever until the data is stored
+ case t: Throwable =>
+ logging.warn(this, s"unexpected error happened: $t, retry
storing data ${request.key}")
+ sendMessageToSelfAfter(request, retryInterval)
+ }
+ case None =>
+ logging.warn(this, s"lease not found, retry storing data
${request.key}")
+ leaseService ! GetLease
+ sendMessageToSelfAfter(request, retryInterval)
+ }
+
+ // it stores the data iif there is no such one
+ case request: RegisterInitialData =>
+ lease match {
+ case Some(l) =>
+ etcdClient
+ .putTxn(request.key, request.value, 0, l.id)
+ .map { res =>
+ dataManagementService ! FinishWork(request.key)
+ if (res.getSucceeded) {
+ logging.info(this, s"initial data storing succeeds for
${request.key}")
+ request.recipient.map(_ !
InitialDataStorageResults(request.key, Right(Done())))
+ } else {
+ logging.info(this, s"data is already stored for: $request,
cancel the initial data storing")
+ request.recipient.map(_ !
InitialDataStorageResults(request.key, Left(AlreadyExist())))
+ }
+ }
+ .recover {
+ // if there is no lease, reissue it and retry immediately
+ case t: StatusRuntimeException =>
+ logging.warn(
+ this,
+ s"a lease is expired while registering an initial data
${request.key}, reissue it: $t")
+ lease = None
+ leaseService ! GetLease
+ sendMessageToSelfAfter(request, retryInterval)
+
+ // it should retry forever until the data is stored
+ case t: Throwable =>
+ logging.warn(this, s"unexpected error happened: $t, retry
storing data for ${request.key}")
+ sendMessageToSelfAfter(request, retryInterval)
+ }
+ case None =>
+ logging.warn(this, s"lease not found, retry storing data for
${request.key}")
+ leaseService ! GetLease
+ sendMessageToSelfAfter(request, retryInterval)
+ }
+
+ case msg: WatcherClosed =>
+ etcdClient
+ .del(msg.key)
+ .andThen {
+ case Success(_) =>
+ dataManagementService ! FinishWork(msg.key)
+ }
+ .recover {
+ // if there is no lease, reissue it and retry immediately
+ case t: StatusRuntimeException =>
+ logging.warn(this, s"a lease is expired while deleting data
${msg.key}, reissue it: $t")
+ lease = None
+ leaseService ! GetLease
+ sendMessageToSelfAfter(msg, retryInterval)
+
+ // it should retry forever until the data is stored
+ case t: Throwable =>
+ logging.warn(this, s"unexpected error happened: $t, retry storing
data for ${msg.key}")
+ sendMessageToSelfAfter(msg, retryInterval)
+ }
+
+ }
+
+ private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration)
= {
+ actorSystem.scheduler.scheduleOnce(retryInterval, self, msg)
+ }
+}
+
+object EtcdWorker {
+ def props(etcdClient: EtcdClient, leaseService: ActorRef)(implicit ec:
ExecutionContext,
+ actorSystem:
ActorSystem,
+ logging: Logging):
Props = {
+ Props(new EtcdWorker(etcdClient, leaseService))
+ }
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
index 3c4f8c6..edf4880 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
@@ -52,6 +52,7 @@ case class WatchEndpointInserted(override val watchKey:
String,
extends WatchEndpointOperation(watchKey, key, value, isPrefix)
case class WatcherClosed(key: String, isPrefix: Boolean)
+// These are abstraction for event from ETCD.
sealed trait EtcdEvent
case object PutEvent extends EtcdEvent
case object DeleteEvent extends EtcdEvent
diff --git
a/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdLeaderShipUnitTests.scala
b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdLeaderShipUnitTests.scala
index 816693a..5285c6d 100644
---
a/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdLeaderShipUnitTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdLeaderShipUnitTests.scala
@@ -28,7 +28,8 @@ import com.ibm.etcd.client.{EtcdClient => Client}
import common.{StreamLogging, WskActorSystem}
import io.grpc.{StatusRuntimeException, Status => GrpcStatus}
import org.apache.openwhisk.core.etcd.EtcdType._
-import org.apache.openwhisk.core.etcd.{EtcdFollower, EtcdLeader,
EtcdLeadershipApi, Lease}
+import org.apache.openwhisk.core.etcd.{EtcdFollower, EtcdLeader,
EtcdLeadershipApi}
+import org.apache.openwhisk.core.service.Lease
import org.junit.runner.RunWith
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.concurrent.ScalaFutures
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/service/DataManagementServiceTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/service/DataManagementServiceTests.scala
new file mode 100644
index 0000000..85dc174
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/service/DataManagementServiceTests.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.testkit.{ImplicitSender, TestActor, TestActorRef, TestKit,
TestProbe}
+import akka.util.Timeout
+import common.StreamLogging
+import org.apache.openwhisk.core.entity.SchedulerInstanceId
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Random
+
+@RunWith(classOf[JUnitRunner])
+class DataManagementServiceTests
+ extends TestKit(ActorSystem("DataManagementService"))
+ with ImplicitSender
+ with FlatSpecLike
+ with ScalaFutures
+ with Matchers
+ with MockFactory
+ with BeforeAndAfterAll
+ with StreamLogging {
+
+ implicit val timeout: Timeout = Timeout(5.seconds)
+ implicit val ec: ExecutionContext = system.dispatcher
+
+ val schedulerId = SchedulerInstanceId("scheduler0")
+ val instanceId = schedulerId
+ val leaseService = TestProbe()
+ val watcherName = "data-management-service"
+ leaseService.setAutoPilot((sender: ActorRef, msg: Any) =>
+ msg match {
+ case GetLease =>
+ sender ! Lease(10, 10)
+ TestActor.KeepRunning
+
+ case _ =>
+ TestActor.KeepRunning
+ })
+
+ private def etcdWorkerFactory(actor: ActorRef) = { (_: ActorRefFactory) =>
+ actor
+ }
+
+ behavior of "DataManagementService"
+
+ it should "distribute work to etcd worker" in {
+ val watcherService = TestProbe()
+ val worker = TestProbe()
+
+ val key = "testKey"
+ val value = "testValue"
+
+ val service = TestActorRef(new DataManagementService(watcherService.ref,
etcdWorkerFactory(worker.ref)))
+
+ val requests = Seq(
+ RegisterData(key, value),
+ ElectLeader(key, value, self),
+ RegisterInitialData(key, value, recipient = Some(testActor)),
+ WatcherClosed(key, false))
+
+ requests.foreach { request =>
+ service ! request
+ worker.expectMsg(request)
+
+ service ! FinishWork(key)
+ }
+ }
+
+ it should "handle request sequentially for a same key" in {
+ val queue = mutable.Queue.empty[String]
+ val watcherService = TestProbe()
+ val workerFactory = (f: ActorRefFactory) =>
+ f.actorOf(Props(new Actor {
+ override def receive: Receive = {
+ case request: RegisterData =>
+ if (request.value == "async")
+ Future {
+ Thread.sleep(1000)
+ queue.enqueue(request.value)
+ context.parent ! FinishWork(request.key)
+ } else {
+ queue.enqueue(request.value)
+ context.parent ! FinishWork(request.key)
+ }
+ }
+ }))
+
+ val key = "testKey"
+ val value = "testValue"
+
+ val service = TestActorRef(new DataManagementService(watcherService.ref,
workerFactory))
+
+ // the first request will be handled asynchronously, but as the second
request has the same key, it will always
+ // processed after the first request is finished
+ val requests = Seq(RegisterData(key, "async"), RegisterData(key, value))
+
+ requests.foreach { request =>
+ service ! request
+ }
+
+ Thread.sleep(2000) // wait for two requests are completed
+ queue.dequeue() shouldBe "async"
+ queue.dequeue() shouldBe value // the second request should be wait for
the first one finished
+ queue.size shouldBe 0
+ }
+
+ it should "handle request concurrently for different keys" in {
+ val queue = mutable.Queue.empty[String]
+ val watcherService = TestProbe()
+ val workerFactory = (f: ActorRefFactory) =>
+ f.actorOf(Props(new Actor {
+ override def receive: Receive = {
+ case request: RegisterData =>
+ if (request.value == "async")
+ Future {
+ Thread.sleep(1000)
+ queue.enqueue(request.value)
+ context.parent ! FinishWork(request.key)
+ } else {
+ queue.enqueue(request.value)
+ context.parent ! FinishWork(request.key)
+ }
+ }
+ }))
+
+ val key = "testKey"
+ val key2 = "testKey2"
+ val value = "testValue"
+
+ val service = TestActorRef(new DataManagementService(watcherService.ref,
workerFactory))
+
+ val requests = Seq(RegisterData(key, "async"), RegisterData(key2, value))
+
+ requests.foreach { request =>
+ service ! request
+ }
+
+ Thread.sleep(2000) // wait for two requests are completed
+ queue.dequeue() shouldBe value // the second request should be completed
first because it doesn't wait
+ queue.dequeue() shouldBe "async"
+ queue.size shouldBe 0
+ }
+
+ it should "remove unnecessary operation" in {
+ val watcherService = TestProbe()
+ val worker = TestProbe()
+
+ val key = "testKey"
+ val value = "testValue"
+
+ val service = TestActorRef(new DataManagementService(watcherService.ref,
etcdWorkerFactory(worker.ref)))
+
+ service ! RegisterData(key, value) // occupy the resource
+ worker.expectMsg(RegisterData(key, value))
+
+ service ! RegisterInitialData(key, value) // this request should also be
removed
+
+ val requests = Random.shuffle(
+ Seq(RegisterData(key, value), RegisterData(key, value),
WatcherClosed(key, false), WatcherClosed(key, false)))
+ // below requests will be merged into one request and wait in the queue
+ requests.foreach { request =>
+ service ! request
+ }
+ worker.expectNoMessage()
+
+ service ! FinishWork(key) // release the resource
+
+ worker.expectMsg(requests(3)) // only the last one should be distributed
+ }
+
+ it should "register data when the target endpoint is removed" in {
+ val watcherService = TestProbe()
+ val worker = TestProbe()
+ val key = "testKey"
+ val value = "testValue"
+
+ val service = TestActorRef(new DataManagementService(watcherService.ref,
etcdWorkerFactory(worker.ref)))
+
+ // no new watcher is registered as it assumes the one is already
registered.
+ service ! WatchEndpointRemoved(key, key, value, isPrefix = false)
+ worker.expectMsg(RegisterInitialData(key, value, false, None))
+ }
+
+ it should "ignore prefixed endpoint-removed results" in {
+ val watcherService = TestProbe()
+ val worker = TestProbe()
+ val key = "testKey"
+ val value = "testValue"
+
+ val service = TestActorRef(new DataManagementService(watcherService.ref,
etcdWorkerFactory(worker.ref)))
+ service ! WatchEndpointRemoved("", key, value, isPrefix = true)
+
+ worker.expectNoMessage()
+ }
+
+ it should "deregister data" in {
+ val watcherService = TestProbe()
+ val worker = TestProbe()
+ watcherService.setAutoPilot((sender, msg) => {
+ msg match {
+ case UnwatchEndpoint(key, isPrefix, _, _) =>
+ sender ! WatcherClosed(key, isPrefix)
+ TestActor.KeepRunning
+ }
+ })
+ val key = "testKey"
+
+ val service = TestActorRef(new DataManagementService(watcherService.ref,
etcdWorkerFactory(worker.ref)))
+ service ! UnregisterData(key)
+
+ watcherService.expectMsg(UnwatchEndpoint(key, isPrefix = false,
watcherName, true))
+ worker.expectMsg(WatcherClosed(key, false))
+ }
+
+ it should "store the resource data" in {
+ val watcherService = TestProbe()
+ val worker = TestProbe()
+ val key = "testKey"
+ val value = "testValue"
+
+ val service = TestActorRef(new DataManagementService(watcherService.ref,
etcdWorkerFactory(worker.ref)))
+ service ! UpdateDataOnChange(key, value)
+
+ worker.expectMsg(RegisterData(key, value))
+ service.underlyingActor.dataCache.size shouldBe 1
+ }
+
+ it should "not store the resource data if there is no change from the last
one" in {
+ val watcherService = TestProbe()
+ val worker = TestProbe()
+ val key = "testKey"
+ val value = "testValue"
+
+ val service = TestActorRef(new DataManagementService(watcherService.ref,
etcdWorkerFactory(worker.ref)))
+ service ! UpdateDataOnChange(key, value)
+
+ worker.expectMsg(RegisterData(key, value))
+ service.underlyingActor.dataCache.size shouldBe 1
+
+ service ! UpdateDataOnChange(key, value)
+ worker.expectNoMessage()
+ service.underlyingActor.dataCache.size shouldBe 1
+ }
+
+ it should "store the resource data if there is change from the last one" in {
+ val watcherService = TestProbe()
+ val worker = TestProbe()
+ val key = "testKey"
+ val value = "testValue"
+ val newValue = "newTestValue"
+
+ val service = TestActorRef(new DataManagementService(watcherService.ref,
etcdWorkerFactory(worker.ref)))
+ service ! UpdateDataOnChange(key, value)
+
+ worker.expectMsg(RegisterData(key, value))
+ service.underlyingActor.dataCache.size shouldBe 1
+
+ service ! FinishWork(key)
+ service ! UpdateDataOnChange(key, newValue)
+ worker.expectMsg(RegisterData(key, newValue, false))
+ service.underlyingActor.dataCache.size shouldBe 1
+ }
+}
diff --git a/tools/travis/runStandaloneTests.sh
b/tools/travis/runStandaloneTests.sh
index 0d11294..144dbbb 100755
--- a/tools/travis/runStandaloneTests.sh
+++ b/tools/travis/runStandaloneTests.sh
@@ -47,6 +47,7 @@ kubectl config set-context --current --namespace=default
# This is required because it is timed out to pull the image during the test.
docker pull openwhisk/action-nodejs-v10:nightly
docker pull openwhisk/dockerskeleton:nightly
+docker pull openwhisk/apigateway:0.11.0
cd $ROOTDIR
TERM=dumb ./gradlew :core:standalone:build \