This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new ecb1509  [New Scheduler] Add DataManagementService (#5063)
ecb1509 is described below

commit ecb15098caded058ddb6976c630f5b6dcd656177
Author: Dominic Kim <[email protected]>
AuthorDate: Fri Mar 19 07:36:56 2021 +0900

    [New Scheduler] Add DataManagementService (#5063)
    
    * Add DataManagementService
    
    * Update 
common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
    
    Apply comment
    
    Co-authored-by: Brendan Doyle <[email protected]>
    
    * Apply suggestions from code review
    
    Update comments
    
    Co-authored-by: Brendan Doyle <[email protected]>
    
    * Apply comments
    
    * Add unit tests for DataManagementService
    
    * Remove unused variable
    
    * Add the license header
    
    * Change Lease
    
    * Pull docker image for the api gateway in advance
    
    Co-authored-by: Brendan Doyle <[email protected]>
---
 ansible/group_vars/all                             |   5 +
 .../org/apache/openwhisk/core/WhiskConfig.scala    |   2 +
 .../apache/openwhisk/core/etcd/EtcdClient.scala    |   3 +-
 .../core/service/DataManagementService.scala       | 328 +++++++++++++++++++++
 .../openwhisk/core/service/WatcherService.scala    |   1 +
 .../common/etcd/EtcdLeaderShipUnitTests.scala      |   3 +-
 .../core/service/DataManagementServiceTests.scala  | 287 ++++++++++++++++++
 tools/travis/runStandaloneTests.sh                 |   1 +
 8 files changed, 627 insertions(+), 3 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 555ad88..bcaba7a 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -450,3 +450,8 @@ etcd_connect_string: "{% set ret = [] %}\
                         {{ ret.append( hostvars[host].ansible_host + ':' + 
((etcd.client.port+loop.index-1)|string) ) }}\
                       {% endfor %}\
                       {{ ret | join(',') }}"
+
+scheduler:
+  dataManagementService:
+    retryInterval: "{{ scheduler_dataManagementService_retryInterval | 
default(1 second) }}"
+
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 67cf783..19ad39d 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -294,4 +294,6 @@ object ConfigKeys {
   val schedulerMaxPeek = "whisk.scheduler.max-peek"
 
   val whiskClusterName = "whisk.cluster.name"
+
+  val dataManagementServiceRetryInterval = 
"whisk.scheduler.data-management-service.retryInterval"
 }
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala
index 1142747..6d16dad 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/etcd/EtcdClient.scala
@@ -27,6 +27,7 @@ import java.util.concurrent.Executors
 
 import org.apache.openwhisk.core.ConfigKeys
 import org.apache.openwhisk.core.etcd.EtcdType._
+import org.apache.openwhisk.core.service.Lease
 import pureconfig.loadConfigOrThrow
 import spray.json.DefaultJsonProtocol
 
@@ -34,8 +35,6 @@ import scala.language.implicitConversions
 import scala.annotation.tailrec
 import scala.concurrent.{ExecutionContextExecutor, Future, Promise}
 
-case class Lease(id: Long, ttl: Long)
-
 object RichListenableFuture {
   implicit def convertToFuture[T](lf: ListenableFuture[T])(implicit ece: 
ExecutionContextExecutor): Future[T] = {
     val p = Promise[T]()
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
new file mode 100644
index 0000000..4258326
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/service/DataManagementService.scala
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.util.Timeout
+import io.grpc.StatusRuntimeException
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
+import org.apache.openwhisk.core.service.DataManagementService.retryInterval
+import pureconfig.loadConfigOrThrow
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable.{Map, Queue}
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.util.Success
+
+// messages received by the actor
+// it is required to specify a recipient directly for the retryable message 
processing
+case class ElectLeader(key: String, value: String, recipient: ActorRef, 
watchEnabled: Boolean = true)
+case class RegisterInitialData(key: String,
+                               value: String,
+                               failoverEnabled: Boolean = true,
+                               recipient: Option[ActorRef] = None)
+
+case class RegisterData(key: String, value: String, failoverEnabled: Boolean = 
true)
+case class UnregisterData(key: String)
+case class UpdateDataOnChange(key: String, value: String)
+
+// messages sent by the actor
+case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
+case class FinishWork(key: String)
+case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, 
Done])
+case class Done()
+case class AlreadyExist()
+
+/**
+ * This service is in charge of storing given data to ETCD.
+ * In the event any issue occurs while storing data, the actor keeps trying 
until the data is stored guaranteeing delivery to ETCD.
+ * So it guarantees the data is eventually stored.
+ */
+class DataManagementService(watcherService: ActorRef, workerFactory: 
ActorRefFactory => ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends Actor {
+  private implicit val ec = context.dispatcher
+
+  implicit val requestTimeout: Timeout = Timeout(5.seconds)
+  private[service] val dataCache = TrieMap[String, String]()
+  private val operations = Map.empty[String, Queue[Any]]
+  private var inProgressKeys = Set.empty[String]
+  private val watcherName = "data-management-service"
+
+  private val worker = workerFactory(context)
+
+  override def receive: Receive = {
+    case FinishWork(key) =>
+      // send waiting operation to worker if there is any, else update the 
inProgressKeys
+      val ops = operations.get(key)
+      if (ops.nonEmpty && ops.get.nonEmpty) {
+        val operation = ops.get.dequeue()
+        worker ! operation
+      } else {
+        inProgressKeys = inProgressKeys - key
+        operations.remove(key) // remove empty queue from the map to free 
memory
+      }
+
+    // normally these messages will be sent when queues are created.
+    case request: ElectLeader =>
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save a request $request into a buffer")
+        operations.getOrElseUpdate(request.key, 
Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterInitialData =>
+      // send WatchEndpoint first as the put operation will be retried until 
success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = 
false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        logging.info(this, s"save request $request into a buffer")
+        operations.getOrElseUpdate(request.key, 
Queue.empty[Any]).enqueue(request)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: RegisterData =>
+      // send WatchEndpoint first as the put operation will be retried until 
success if failed
+      if (request.failoverEnabled)
+        watcherService ! WatchEndpoint(request.key, request.value, isPrefix = 
false, watcherName, Set(DeleteEvent))
+      if (inProgressKeys.contains(request.key)) {
+        // the new put|delete operation will erase influences made by older 
operations like put&delete
+        // so we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, 
Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData 
=> false
+            case _                                                           
=> true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    case request: WatcherClosed =>
+      if (inProgressKeys.contains(request.key)) {
+        // The put|delete operations against the same key will overwrite the 
previous results.
+        // For example, if we put a value, delete it and put a new value 
again, the final result will be the new value.
+        // So we can remove these old operations
+        logging.info(this, s"save request $request into a buffer")
+        val queue = operations.getOrElseUpdate(request.key, 
Queue.empty[Any]).filter { value =>
+          value match {
+            case _: RegisterData | _: WatcherClosed | _: RegisterInitialData 
=> false
+            case _                                                           
=> true
+          }
+        }
+        queue.enqueue(request)
+        operations.update(request.key, queue)
+      } else {
+        worker ! request
+        inProgressKeys = inProgressKeys + request.key
+      }
+
+    // It is required to close the watcher first before deleting etcd data
+    // It is supposed to receive the WatcherClosed message after the watcher 
is stopped.
+    case msg: UnregisterData =>
+      watcherService ! UnwatchEndpoint(msg.key, isPrefix = false, watcherName, 
needFeedback = true)
+
+    case WatchEndpointRemoved(_, key, value, false) =>
+      self ! RegisterInitialData(key, value, failoverEnabled = false) // the 
watcher is already setup
+
+    // It should not receive "prefixed" data
+    case WatchEndpointRemoved(_, key, value, true) =>
+      logging.error(this, s"unexpected data received: ${WatchEndpoint(key, 
value, isPrefix = true, watcherName)}")
+
+    case msg: UpdateDataOnChange =>
+      dataCache.get(msg.key) match {
+        case Some(cached) if cached == msg.value =>
+          logging.debug(this, s"skip publishing data ${msg.key} because the 
data is not changed.")
+
+        case Some(cached) if cached != msg.value =>
+          dataCache.update(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value, failoverEnabled = false) // 
the watcher is already setup
+
+        case None =>
+          dataCache.put(msg.key, msg.value)
+          self ! RegisterData(msg.key, msg.value)
+
+      }
+  }
+}
+
+object DataManagementService {
+  val retryInterval: FiniteDuration = 
loadConfigOrThrow[FiniteDuration](ConfigKeys.dataManagementServiceRetryInterval)
+
+  def props(watcherService: ActorRef, workerFactory: ActorRefFactory => 
ActorRef)(implicit logging: Logging,
+                                                                               
   actorSystem: ActorSystem): Props = {
+    Props(new DataManagementService(watcherService, workerFactory))
+  }
+}
+
+private[service] class EtcdWorker(etcdClient: EtcdClient, leaseService: 
ActorRef)(implicit val ec: ExecutionContext,
+                                                                               
   actorSystem: ActorSystem,
+                                                                               
   logging: Logging)
+    extends Actor {
+
+  private val dataManagementService = context.parent
+  private var lease: Option[Lease] = None
+  leaseService ! GetLease
+
+  override def receive: Receive = {
+    case msg: Lease =>
+      lease = Some(msg)
+
+    // leader election + endpoint management
+    case request: ElectLeader =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .electLeader(request.key, request.value, l)
+            .andThen {
+              case Success(msg) =>
+                request.recipient ! ElectionResult(msg)
+                dataManagementService ! FinishWork(request.key)
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                logging.warn(this, s"a lease is expired while leader election, 
reissue it: $t")
+                lease = None
+                leaseService ! GetLease
+                sendMessageToSelfAfter(request, retryInterval)
+
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                logging.warn(this, s"unexpected error happened: $t, retry 
storing data")
+                sendMessageToSelfAfter(request, retryInterval)
+            }
+        case None =>
+          logging.warn(this, s"lease not found, retry storing data")
+          leaseService ! GetLease
+          sendMessageToSelfAfter(request, retryInterval)
+      }
+
+    // only endpoint management
+    case request: RegisterData =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .put(request.key, request.value, l.id)
+            .andThen {
+              case Success(_) =>
+                dataManagementService ! FinishWork(request.key)
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                logging.warn(this, s"a lease is expired while registering data 
${request.key}, reissue it: $t")
+                lease = None
+                leaseService ! GetLease
+                sendMessageToSelfAfter(request, retryInterval)
+
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                logging.warn(this, s"unexpected error happened: $t, retry 
storing data ${request.key}")
+                sendMessageToSelfAfter(request, retryInterval)
+            }
+        case None =>
+          logging.warn(this, s"lease not found, retry storing data 
${request.key}")
+          leaseService ! GetLease
+          sendMessageToSelfAfter(request, retryInterval)
+      }
+
+    // it stores the data iif there is no such one
+    case request: RegisterInitialData =>
+      lease match {
+        case Some(l) =>
+          etcdClient
+            .putTxn(request.key, request.value, 0, l.id)
+            .map { res =>
+              dataManagementService ! FinishWork(request.key)
+              if (res.getSucceeded) {
+                logging.info(this, s"initial data storing succeeds for 
${request.key}")
+                request.recipient.map(_ ! 
InitialDataStorageResults(request.key, Right(Done())))
+              } else {
+                logging.info(this, s"data is already stored for: $request, 
cancel the initial data storing")
+                request.recipient.map(_ ! 
InitialDataStorageResults(request.key, Left(AlreadyExist())))
+              }
+            }
+            .recover {
+              // if there is no lease, reissue it and retry immediately
+              case t: StatusRuntimeException =>
+                logging.warn(
+                  this,
+                  s"a lease is expired while registering an initial data 
${request.key}, reissue it: $t")
+                lease = None
+                leaseService ! GetLease
+                sendMessageToSelfAfter(request, retryInterval)
+
+              // it should retry forever until the data is stored
+              case t: Throwable =>
+                logging.warn(this, s"unexpected error happened: $t, retry 
storing data for ${request.key}")
+                sendMessageToSelfAfter(request, retryInterval)
+            }
+        case None =>
+          logging.warn(this, s"lease not found, retry storing data for 
${request.key}")
+          leaseService ! GetLease
+          sendMessageToSelfAfter(request, retryInterval)
+      }
+
+    case msg: WatcherClosed =>
+      etcdClient
+        .del(msg.key)
+        .andThen {
+          case Success(_) =>
+            dataManagementService ! FinishWork(msg.key)
+        }
+        .recover {
+          // if there is no lease, reissue it and retry immediately
+          case t: StatusRuntimeException =>
+            logging.warn(this, s"a lease is expired while deleting data 
${msg.key}, reissue it: $t")
+            lease = None
+            leaseService ! GetLease
+            sendMessageToSelfAfter(msg, retryInterval)
+
+          // it should retry forever until the data is stored
+          case t: Throwable =>
+            logging.warn(this, s"unexpected error happened: $t, retry storing 
data for ${msg.key}")
+            sendMessageToSelfAfter(msg, retryInterval)
+        }
+
+  }
+
+  private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration) 
= {
+    actorSystem.scheduler.scheduleOnce(retryInterval, self, msg)
+  }
+}
+
+object EtcdWorker {
+  def props(etcdClient: EtcdClient, leaseService: ActorRef)(implicit ec: 
ExecutionContext,
+                                                            actorSystem: 
ActorSystem,
+                                                            logging: Logging): 
Props = {
+    Props(new EtcdWorker(etcdClient, leaseService))
+  }
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
index 3c4f8c6..edf4880 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
@@ -52,6 +52,7 @@ case class WatchEndpointInserted(override val watchKey: 
String,
     extends WatchEndpointOperation(watchKey, key, value, isPrefix)
 case class WatcherClosed(key: String, isPrefix: Boolean)
 
+// These are abstraction for event from ETCD.
 sealed trait EtcdEvent
 case object PutEvent extends EtcdEvent
 case object DeleteEvent extends EtcdEvent
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdLeaderShipUnitTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdLeaderShipUnitTests.scala
index 816693a..5285c6d 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdLeaderShipUnitTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/common/etcd/EtcdLeaderShipUnitTests.scala
@@ -28,7 +28,8 @@ import com.ibm.etcd.client.{EtcdClient => Client}
 import common.{StreamLogging, WskActorSystem}
 import io.grpc.{StatusRuntimeException, Status => GrpcStatus}
 import org.apache.openwhisk.core.etcd.EtcdType._
-import org.apache.openwhisk.core.etcd.{EtcdFollower, EtcdLeader, 
EtcdLeadershipApi, Lease}
+import org.apache.openwhisk.core.etcd.{EtcdFollower, EtcdLeader, 
EtcdLeadershipApi}
+import org.apache.openwhisk.core.service.Lease
 import org.junit.runner.RunWith
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.concurrent.ScalaFutures
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/service/DataManagementServiceTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/service/DataManagementServiceTests.scala
new file mode 100644
index 0000000..85dc174
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/service/DataManagementServiceTests.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.testkit.{ImplicitSender, TestActor, TestActorRef, TestKit, 
TestProbe}
+import akka.util.Timeout
+import common.StreamLogging
+import org.apache.openwhisk.core.entity.SchedulerInstanceId
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Random
+
+@RunWith(classOf[JUnitRunner])
+class DataManagementServiceTests
+    extends TestKit(ActorSystem("DataManagementService"))
+    with ImplicitSender
+    with FlatSpecLike
+    with ScalaFutures
+    with Matchers
+    with MockFactory
+    with BeforeAndAfterAll
+    with StreamLogging {
+
+  implicit val timeout: Timeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = system.dispatcher
+
+  val schedulerId = SchedulerInstanceId("scheduler0")
+  val instanceId = schedulerId
+  val leaseService = TestProbe()
+  val watcherName = "data-management-service"
+  leaseService.setAutoPilot((sender: ActorRef, msg: Any) =>
+    msg match {
+      case GetLease =>
+        sender ! Lease(10, 10)
+        TestActor.KeepRunning
+
+      case _ =>
+        TestActor.KeepRunning
+  })
+
+  private def etcdWorkerFactory(actor: ActorRef) = { (_: ActorRefFactory) =>
+    actor
+  }
+
+  behavior of "DataManagementService"
+
+  it should "distribute work to etcd worker" in {
+    val watcherService = TestProbe()
+    val worker = TestProbe()
+
+    val key = "testKey"
+    val value = "testValue"
+
+    val service = TestActorRef(new DataManagementService(watcherService.ref, 
etcdWorkerFactory(worker.ref)))
+
+    val requests = Seq(
+      RegisterData(key, value),
+      ElectLeader(key, value, self),
+      RegisterInitialData(key, value, recipient = Some(testActor)),
+      WatcherClosed(key, false))
+
+    requests.foreach { request =>
+      service ! request
+      worker.expectMsg(request)
+
+      service ! FinishWork(key)
+    }
+  }
+
+  it should "handle request sequentially for a same key" in {
+    val queue = mutable.Queue.empty[String]
+    val watcherService = TestProbe()
+    val workerFactory = (f: ActorRefFactory) =>
+      f.actorOf(Props(new Actor {
+        override def receive: Receive = {
+          case request: RegisterData =>
+            if (request.value == "async")
+              Future {
+                Thread.sleep(1000)
+                queue.enqueue(request.value)
+                context.parent ! FinishWork(request.key)
+              } else {
+              queue.enqueue(request.value)
+              context.parent ! FinishWork(request.key)
+            }
+        }
+      }))
+
+    val key = "testKey"
+    val value = "testValue"
+
+    val service = TestActorRef(new DataManagementService(watcherService.ref, 
workerFactory))
+
+    // the first request will be handled asynchronously, but as the second 
request has the same key, it will always
+    // processed after the first request is finished
+    val requests = Seq(RegisterData(key, "async"), RegisterData(key, value))
+
+    requests.foreach { request =>
+      service ! request
+    }
+
+    Thread.sleep(2000) // wait for two requests are completed
+    queue.dequeue() shouldBe "async"
+    queue.dequeue() shouldBe value // the second request should be wait for 
the first one finished
+    queue.size shouldBe 0
+  }
+
+  it should "handle request concurrently for different keys" in {
+    val queue = mutable.Queue.empty[String]
+    val watcherService = TestProbe()
+    val workerFactory = (f: ActorRefFactory) =>
+      f.actorOf(Props(new Actor {
+        override def receive: Receive = {
+          case request: RegisterData =>
+            if (request.value == "async")
+              Future {
+                Thread.sleep(1000)
+                queue.enqueue(request.value)
+                context.parent ! FinishWork(request.key)
+              } else {
+              queue.enqueue(request.value)
+              context.parent ! FinishWork(request.key)
+            }
+        }
+      }))
+
+    val key = "testKey"
+    val key2 = "testKey2"
+    val value = "testValue"
+
+    val service = TestActorRef(new DataManagementService(watcherService.ref, 
workerFactory))
+
+    val requests = Seq(RegisterData(key, "async"), RegisterData(key2, value))
+
+    requests.foreach { request =>
+      service ! request
+    }
+
+    Thread.sleep(2000) // wait for two requests are completed
+    queue.dequeue() shouldBe value // the second request should be completed 
first because it doesn't wait
+    queue.dequeue() shouldBe "async"
+    queue.size shouldBe 0
+  }
+
+  it should "remove unnecessary operation" in {
+    val watcherService = TestProbe()
+    val worker = TestProbe()
+
+    val key = "testKey"
+    val value = "testValue"
+
+    val service = TestActorRef(new DataManagementService(watcherService.ref, 
etcdWorkerFactory(worker.ref)))
+
+    service ! RegisterData(key, value) // occupy the resource
+    worker.expectMsg(RegisterData(key, value))
+
+    service ! RegisterInitialData(key, value) // this request should also be 
removed
+
+    val requests = Random.shuffle(
+      Seq(RegisterData(key, value), RegisterData(key, value), 
WatcherClosed(key, false), WatcherClosed(key, false)))
+    // below requests will be merged into one request and wait in the queue
+    requests.foreach { request =>
+      service ! request
+    }
+    worker.expectNoMessage()
+
+    service ! FinishWork(key) // release the resource
+
+    worker.expectMsg(requests(3)) // only the last one should be distributed
+  }
+
+  it should "register data when the target endpoint is removed" in {
+    val watcherService = TestProbe()
+    val worker = TestProbe()
+    val key = "testKey"
+    val value = "testValue"
+
+    val service = TestActorRef(new DataManagementService(watcherService.ref, 
etcdWorkerFactory(worker.ref)))
+
+    // no new watcher is registered as it assumes the one is already 
registered.
+    service ! WatchEndpointRemoved(key, key, value, isPrefix = false)
+    worker.expectMsg(RegisterInitialData(key, value, false, None))
+  }
+
+  it should "ignore prefixed endpoint-removed results" in {
+    val watcherService = TestProbe()
+    val worker = TestProbe()
+    val key = "testKey"
+    val value = "testValue"
+
+    val service = TestActorRef(new DataManagementService(watcherService.ref, 
etcdWorkerFactory(worker.ref)))
+    service ! WatchEndpointRemoved("", key, value, isPrefix = true)
+
+    worker.expectNoMessage()
+  }
+
+  it should "deregister data" in {
+    val watcherService = TestProbe()
+    val worker = TestProbe()
+    watcherService.setAutoPilot((sender, msg) => {
+      msg match {
+        case UnwatchEndpoint(key, isPrefix, _, _) =>
+          sender ! WatcherClosed(key, isPrefix)
+          TestActor.KeepRunning
+      }
+    })
+    val key = "testKey"
+
+    val service = TestActorRef(new DataManagementService(watcherService.ref, 
etcdWorkerFactory(worker.ref)))
+    service ! UnregisterData(key)
+
+    watcherService.expectMsg(UnwatchEndpoint(key, isPrefix = false, 
watcherName, true))
+    worker.expectMsg(WatcherClosed(key, false))
+  }
+
+  it should "store the resource data" in {
+    val watcherService = TestProbe()
+    val worker = TestProbe()
+    val key = "testKey"
+    val value = "testValue"
+
+    val service = TestActorRef(new DataManagementService(watcherService.ref, 
etcdWorkerFactory(worker.ref)))
+    service ! UpdateDataOnChange(key, value)
+
+    worker.expectMsg(RegisterData(key, value))
+    service.underlyingActor.dataCache.size shouldBe 1
+  }
+
+  it should "not store the resource data if there is no change from the last 
one" in {
+    val watcherService = TestProbe()
+    val worker = TestProbe()
+    val key = "testKey"
+    val value = "testValue"
+
+    val service = TestActorRef(new DataManagementService(watcherService.ref, 
etcdWorkerFactory(worker.ref)))
+    service ! UpdateDataOnChange(key, value)
+
+    worker.expectMsg(RegisterData(key, value))
+    service.underlyingActor.dataCache.size shouldBe 1
+
+    service ! UpdateDataOnChange(key, value)
+    worker.expectNoMessage()
+    service.underlyingActor.dataCache.size shouldBe 1
+  }
+
+  it should "store the resource data if there is change from the last one" in {
+    val watcherService = TestProbe()
+    val worker = TestProbe()
+    val key = "testKey"
+    val value = "testValue"
+    val newValue = "newTestValue"
+
+    val service = TestActorRef(new DataManagementService(watcherService.ref, 
etcdWorkerFactory(worker.ref)))
+    service ! UpdateDataOnChange(key, value)
+
+    worker.expectMsg(RegisterData(key, value))
+    service.underlyingActor.dataCache.size shouldBe 1
+
+    service ! FinishWork(key)
+    service ! UpdateDataOnChange(key, newValue)
+    worker.expectMsg(RegisterData(key, newValue, false))
+    service.underlyingActor.dataCache.size shouldBe 1
+  }
+}
diff --git a/tools/travis/runStandaloneTests.sh 
b/tools/travis/runStandaloneTests.sh
index 0d11294..144dbbb 100755
--- a/tools/travis/runStandaloneTests.sh
+++ b/tools/travis/runStandaloneTests.sh
@@ -47,6 +47,7 @@ kubectl config set-context --current --namespace=default
 # This is required because it is timed out to pull the image during the test.
 docker pull openwhisk/action-nodejs-v10:nightly
 docker pull openwhisk/dockerskeleton:nightly
+docker pull openwhisk/apigateway:0.11.0
 
 cd $ROOTDIR
 TERM=dumb ./gradlew :core:standalone:build \

Reply via email to