This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new e05aa44  [New Scheduler] Implement KeepAliveService (#5067)
e05aa44 is described below

commit e05aa44b0cab519c82cf84a8171671a21d779562
Author: 김건희 <[email protected]>
AuthorDate: Thu Mar 11 10:07:02 2021 +0900

    [New Scheduler] Implement KeepAliveService (#5067)
    
    * Add KeepAliveService
    
    * Add KeepAliveService to Scheduler
    
    * Update name for case class
    
    * Include workers in state data to thread safe
---
 .../org/apache/openwhisk/common/Logging.scala      |   4 +
 .../org/apache/openwhisk/core/WhiskConfig.scala    |   3 +
 .../core/service/LeaseKeepAliveService.scala       | 179 ++++++++++++++++
 .../openwhisk/core/scheduler/Scheduler.scala       |  17 +-
 .../core/service/LeaseKeepAliveServiceTests.scala  | 232 +++++++++++++++++++++
 5 files changed, 428 insertions(+), 7 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 9efc977..885b2a2 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -338,6 +338,7 @@ object LoggingMarkers {
   val timeout = "timeout"
 
   private val controller = "controller"
+  private val scheduler = "scheduler"
   private val invoker = "invoker"
   private val database = "database"
   private val activation = "activation"
@@ -555,6 +556,9 @@ object LoggingMarkers {
       LogMarkerToken(kafka, "topic", start, Some("delay"), Map("topic" -> 
topic))(MeasurementUnit.time.milliseconds)
     else LogMarkerToken(kafka, topic, start, 
Some("delay"))(MeasurementUnit.time.milliseconds)
 
+  def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
+    LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> 
leaseId.toString))(MeasurementUnit.none)
+
   /*
    * General markers
    */
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index ba15f5f..67cf783 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -266,6 +266,7 @@ object ConfigKeys {
   val controllerActivation = s"$controller.activation"
 
   val etcd = "whisk.etcd"
+  val etcdLeaseTimeout = "whisk.etcd.lease.timeout"
   val etcdPoolThreads = "whisk.etcd.pool.threads"
 
   val activationStore = "whisk.activation-store"
@@ -290,5 +291,7 @@ object ConfigKeys {
 
   val azBlob = "whisk.azure-blob"
 
+  val schedulerMaxPeek = "whisk.scheduler.max-peek"
+
   val whiskClusterName = "whisk.cluster.name"
 }
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/service/LeaseKeepAliveService.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/service/LeaseKeepAliveService.scala
new file mode 100644
index 0000000..c29b878
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/service/LeaseKeepAliveService.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.service
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
+import akka.pattern.pipe
+import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter}
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.entity.InstanceId
+import org.apache.openwhisk.core.etcd.EtcdClient
+import org.apache.openwhisk.core.etcd.EtcdKV.InstanceKeys.instanceLease
+import pureconfig.loadConfigOrThrow
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.util.{Failure, Success}
+
+// States
+sealed trait KeepAliveServiceState
+case object Ready extends KeepAliveServiceState
+case object Active extends KeepAliveServiceState
+
+// Data
+sealed trait KeepAliveServiceData
+case object NoData extends KeepAliveServiceData
+case class Lease(id: Long, ttl: Long) extends KeepAliveServiceData
+case class ActiveStates(worker: Cancellable, lease: Lease) extends 
KeepAliveServiceData
+
+// Events received by the actor
+case object RegrantLease
+case object GetLease
+case object GrantLease
+
+// Events internally used
+case class SetLease(lease: Lease)
+case class SetWatcher(worker: Cancellable)
+
+class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, 
watcherService: ActorRef)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
+    extends FSM[KeepAliveServiceState, KeepAliveServiceData]
+    with Stash {
+
+  implicit val ec: ExecutionContextExecutor = context.dispatcher
+
+  private val leaseTimeout = 
loadConfigOrThrow[Int](ConfigKeys.etcdLeaseTimeout).seconds
+  private val key = instanceLease(instanceId)
+  private val watcherName = "lease-service"
+
+  self ! GrantLease
+  startWith(Ready, NoData)
+
+  when(Ready) {
+    case Event(GrantLease, NoData) =>
+      etcdClient
+        .grant(leaseTimeout.toSeconds)
+        .map { res =>
+          SetLease(Lease(res.getID, res.getTTL))
+        }
+        .pipeTo(self)
+      stay
+
+    case Event(SetLease(lease), NoData) =>
+      startKeepAliveService(lease)
+        .pipeTo(self)
+      logging.info(this, s"Granted a new lease $lease")
+      stay using lease
+
+    case Event(SetWatcher(w), l: Lease) =>
+      goto(Active) using ActiveStates(w, l)
+
+    case Event(t: FailureMessage, _) =>
+      logging.warn(this, s"Failed to grant new lease caused by: $t")
+      self ! GrantLease
+      stay()
+
+    case _ => delay
+  }
+
+  when(Active) {
+    case Event(WatchEndpointRemoved(`key`, `key`, _, false), 
ActiveStates(worker, lease)) =>
+      logging.info(this, s"endpoint ie removed so recreate a lease")
+      recreateLease(worker, lease)
+
+    case Event(RegrantLease, ActiveStates(worker, lease)) =>
+      logging.info(this, s"ReGrant a lease, old lease:${lease}")
+      recreateLease(worker, lease)
+
+    case Event(GetLease, ActiveStates(_, lease)) =>
+      logging.info(this, s"send the lease(${lease}) to ${sender()}")
+      sender() ! lease
+      stay()
+
+    case _ => delay
+  }
+
+  initialize()
+
+  private def startKeepAliveService(lease: Lease): Future[SetWatcher] = {
+    val worker =
+      actorSystem.scheduler.schedule(initialDelay = 0.second, interval = 
500.milliseconds)(keepAliveOnce(lease))
+
+    /**
+     * To verify that lease has been deleted since timeout,
+     * create a key using lease, watch the key, and receive an event for 
deletion.
+     */
+    etcdClient.put(key, s"${lease.id}", lease.id).map { _ =>
+      watcherService ! WatchEndpoint(key, s"${lease.id}", false, watcherName, 
Set(DeleteEvent))
+      SetWatcher(worker)
+    }
+  }
+
+  private def keepAliveOnce(lease: Lease): Future[Long] = {
+    etcdClient
+      .keepAliveOnce(lease.id)
+      .map(_.getID)
+      .andThen {
+        case Success(_) => 
MetricEmitter.emitCounterMetric(LoggingMarkers.SCHEDULER_KEEP_ALIVE(lease.id))
+        case Failure(t) =>
+          logging.warn(this, s"Failed to keep-alive of ${lease.id} caused by 
${t}")
+          self ! RegrantLease
+      }
+  }
+
+  private def recreateLease(worker: Cancellable, lease: Lease) = {
+    logging.info(this, s"recreate a lease, old lease: $lease")
+    worker.cancel() // stop scheduler
+    watcherService ! UnwatchEndpoint(key, false, watcherName) // stop watcher
+    etcdClient
+      .revoke(lease.id) // delete lease
+      .onComplete(_ => self ! GrantLease) // create lease
+    goto(Ready) using NoData
+  }
+
+  // Unstash all messages stashed while in intermediate state
+  onTransition {
+    case _ -> Ready  => unstashAll()
+    case _ -> Active => unstashAll()
+  }
+
+  /** Delays all incoming messages until unstashAll() is called */
+  def delay = {
+    stash()
+    stay
+  }
+
+  override def postStop(): Unit = {
+    stateData match {
+      case ActiveStates(w, _) => w.cancel() // stop scheduler if that exist
+      case _                  => // do nothing
+    }
+    watcherService ! UnwatchEndpoint(key, false, watcherName)
+  }
+}
+
+object LeaseKeepAliveService {
+  def props(etcdClient: EtcdClient, instanceId: InstanceId, watcherService: 
ActorRef)(
+    implicit logging: Logging,
+    actorSystem: ActorSystem): Props = {
+    Props(new LeaseKeepAliveService(etcdClient, instanceId, watcherService))
+      .withDispatcher("dispatchers.lease-service-dispatcher")
+  }
+}
diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index 6bb4311..d9bb08d 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -18,19 +18,21 @@
 package org.apache.openwhisk.core.scheduler
 
 import akka.Done
-import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, 
CoordinatedShutdown}
+import akka.actor.{ActorRef, ActorRefFactory, ActorSelection, ActorSystem, 
CoordinatedShutdown}
 import akka.stream.ActorMaterializer
 import akka.util.Timeout
 import com.typesafe.config.ConfigValueFactory
 import kamon.Kamon
 import org.apache.openwhisk.common.Https.HttpsConfig
 import org.apache.openwhisk.common._
-import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
 import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
 import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.database.{ActivationStoreProvider, 
NoDocumentException, UserContext}
 import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.service.{LeaseKeepAliveService, 
WatcherService}
 import org.apache.openwhisk.http.BasicHttpService
 import org.apache.openwhisk.spi.SpiLoader
 import org.apache.openwhisk.utils.ExecutionContextFactory
@@ -55,10 +57,11 @@ class Scheduler(schedulerId: SchedulerInstanceId, 
schedulerEndpoints: SchedulerE
   val msgProvider = SpiLoader.get[MessagingProvider]
   val producer = msgProvider.getProducer(config, 
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
 
-  val maxPeek = "" // TODO: TBD
-  val etcdClient = "" // TODO: TBD
-  val watcherService = "" // TODO: TBD
-  val leaseService = "" // TODO: TBD
+  val maxPeek = loadConfigOrThrow[Int](ConfigKeys.schedulerMaxPeek)
+  val etcdClient = 
EtcdClient(loadConfigOrThrow[EtcdConfig](ConfigKeys.etcd).hosts)
+  val watcherService: ActorRef = 
actorSystem.actorOf(WatcherService.props(etcdClient))
+  val leaseService =
+    actorSystem.actorOf(LeaseKeepAliveService.props(etcdClient, schedulerId, 
watcherService))
 
   implicit val entityStore = WhiskEntityStore.datastore()
   private val activationStore =
@@ -139,7 +142,7 @@ class Scheduler(schedulerId: SchedulerInstanceId, 
schedulerEndpoints: SchedulerE
     config,
     s"scheduler${schedulerId.asString}",
     s"scheduler${schedulerId.asString}",
-    500, // TODO: to be updated with maxPeek variable
+    maxPeek,
     maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
 
   implicit val trasnid = TransactionId.containerCreation
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/service/LeaseKeepAliveServiceTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/service/LeaseKeepAliveServiceTests.scala
new file mode 100644
index 0000000..2a7f1b6
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/service/LeaseKeepAliveServiceTests.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.service
+
+import akka.actor.ActorSystem
+import akka.pattern.ask
+import akka.testkit.{ImplicitSender, TestFSMRef, TestKit, TestProbe}
+import akka.util.Timeout
+import com.ibm.etcd.api.{LeaseGrantResponse, LeaseKeepAliveResponse, 
LeaseRevokeResponse, PutResponse}
+import common.StreamLogging
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.entity.{ExecManifest, SchedulerInstanceId}
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdKV}
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+
+@RunWith(classOf[JUnitRunner])
+class LeaseKeepAliveServiceTests
+    extends TestKit(ActorSystem("LeaseKeepAliveService"))
+    with ImplicitSender
+    with FlatSpecLike
+    with ScalaFutures
+    with Matchers
+    with MockFactory
+    with BeforeAndAfterAll
+    with StreamLogging {
+
+  implicit val timeout: Timeout = Timeout(5.seconds)
+  implicit val ec: ExecutionContext = system.dispatcher
+  val config = new WhiskConfig(ExecManifest.requiredProperties)
+  val testInstanceId = SchedulerInstanceId("0")
+  val testLeaseId = 10
+  val newTestLeaseId = 20
+  val testTtl = 1
+  val testLease = Lease(testLeaseId, testTtl)
+  val newTestLease = Lease(newTestLeaseId, testTtl)
+  val testKey = EtcdKV.InstanceKeys.instanceLease(testInstanceId)
+  val newTestKey = EtcdKV.InstanceKeys.instanceLease(testInstanceId)
+
+  val watcherName = "lease-service"
+
+  def grant(etcd: EtcdClient): Unit = {
+    (etcd
+      .grant(_: Long))
+      .expects(*)
+      
.returning(Future.successful(LeaseGrantResponse.newBuilder().setID(testLeaseId).setTTL(testTtl).build()))
+  }
+
+  def put(etcd: EtcdClient): Unit = {
+    (etcd
+      .put(_: String, _: String, _: Long))
+      .expects(testKey, *, *)
+      .returning(Future.successful(PutResponse.newBuilder().build()))
+  }
+
+  def keepAliveOnce(etcd: EtcdClient): Unit = {
+    (etcd
+      .keepAliveOnce(_: Long))
+      .expects(testLeaseId)
+      
.returning(Future.successful(LeaseKeepAliveResponse.newBuilder().setID(testLeaseId).build()))
+      .anyNumberOfTimes()
+  }
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+    super.afterAll()
+  }
+
+  behavior of "LeaseKeepAliveService"
+
+  it should "grant new lease" in {
+
+    val mockEtcd = mock[EtcdClient]
+    grant(mockEtcd)
+    put(mockEtcd)
+    keepAliveOnce(mockEtcd)
+
+    val watcher = TestProbe()
+    val service = TestFSMRef(new LeaseKeepAliveService(mockEtcd, 
testInstanceId, watcher.ref))
+
+    Thread.sleep(1000)
+    service.stateName shouldBe Active
+    service.stateData shouldBe a[ActiveStates]
+    service.stateData match {
+      case ActiveStates(_, lease) => lease shouldBe testLease
+      case _                      => fail()
+    }
+    watcher.expectMsg(WatchEndpoint(testKey, testLease.id.toString, false, 
watcherName, Set(DeleteEvent)))
+
+  }
+
+  it should "regrant a new lease while old lease is deleted" in {
+    val mockEtcd = mock[EtcdClient]
+    grant(mockEtcd)
+    put(mockEtcd)
+    keepAliveOnce(mockEtcd)
+
+    (mockEtcd
+      .revoke(_: Long))
+      .expects(testLeaseId)
+      .returning(Future.successful(LeaseRevokeResponse.newBuilder().build()))
+    (mockEtcd
+      .grant(_: Long))
+      .expects(*)
+      
.returning(Future.successful(LeaseGrantResponse.newBuilder().setID(newTestLeaseId).setTTL(testTtl).build()))
+    (mockEtcd
+      .put(_: String, _: String, _: Long))
+      .expects(newTestKey, *, *)
+      .returning(Future.successful(PutResponse.newBuilder().build()))
+    (mockEtcd
+      .keepAliveOnce(_: Long))
+      .expects(newTestLeaseId)
+      
.returning(Future.successful(LeaseKeepAliveResponse.newBuilder().setID(newTestLeaseId).build()))
+      .anyNumberOfTimes()
+
+    val watcher = TestProbe()
+    val service = TestFSMRef(new LeaseKeepAliveService(mockEtcd, 
testInstanceId, watcher.ref))
+
+    service.stateName shouldBe Active
+    service.stateData shouldBe a[ActiveStates]
+    service.stateData match {
+      case ActiveStates(_, lease) => lease shouldBe testLease
+      case _                      => fail()
+    }
+    watcher.expectMsg(WatchEndpoint(testKey, testLease.id.toString, false, 
watcherName, Set(DeleteEvent)))
+
+    service ! WatchEndpointRemoved(testKey, testKey, testLease.id.toString, 
false)
+
+    watcher.expectMsg(UnwatchEndpoint(testKey, false, watcherName))
+    Thread.sleep(500) //wait for the lease to be granted
+
+    service.stateName shouldBe Active
+    service.stateData shouldBe a[ActiveStates]
+    service.stateData match {
+      case ActiveStates(_, lease) => lease shouldBe newTestLease
+      case _                      => fail()
+    }
+    watcher.expectMsg(WatchEndpoint(newTestKey, newTestLease.id.toString, 
false, watcherName, Set(DeleteEvent)))
+  }
+
+  it should "get lease" in {
+    val mockEtcd = mock[EtcdClient]
+    grant(mockEtcd)
+    put(mockEtcd)
+    keepAliveOnce(mockEtcd)
+
+    val service = TestFSMRef(new LeaseKeepAliveService(mockEtcd, 
testInstanceId, TestProbe().ref))
+
+    (service ? GetLease).mapTo[Lease].futureValue shouldBe testLease
+  }
+
+  it should "regrant a new lease when keepalive is failed" in {
+    val mockEtcd = mock[EtcdClient]
+    grant(mockEtcd)
+    put(mockEtcd)
+
+    (mockEtcd
+      .keepAliveOnce(_: Long))
+      .expects(testLeaseId)
+      
.returning(Future.successful(LeaseKeepAliveResponse.newBuilder().setID(testLeaseId).build()))
+      .noMoreThanTwice()
+
+    (mockEtcd
+      .keepAliveOnce(_: Long))
+      .expects(testLeaseId)
+      .returning(Future.failed(new RuntimeException("failed to keep alive the 
lease")))
+      .noMoreThanOnce()
+
+    (mockEtcd
+      .revoke(_: Long))
+      .expects(testLeaseId)
+      .returning(Future.successful(LeaseRevokeResponse.newBuilder().build()))
+
+    (mockEtcd
+      .keepAliveOnce(_: Long))
+      .expects(newTestLeaseId)
+      
.returning(Future.successful(LeaseKeepAliveResponse.newBuilder().setID(newTestLeaseId).build()))
+      .anyNumberOfTimes()
+
+    (mockEtcd
+      .grant(_: Long))
+      .expects(*)
+      
.returning(Future.successful(LeaseGrantResponse.newBuilder().setID(newTestLeaseId).setTTL(testTtl).build()))
+    (mockEtcd
+      .put(_: String, _: String, _: Long))
+      .expects(newTestKey, *, *)
+      .returning(Future.successful(PutResponse.newBuilder().build()))
+
+    val watcher = TestProbe()
+    val service = TestFSMRef(new LeaseKeepAliveService(mockEtcd, 
testInstanceId, watcher.ref))
+    service.stateName shouldBe Active
+    service.stateData shouldBe a[ActiveStates]
+    service.stateData match {
+      case ActiveStates(_, lease) => lease shouldBe testLease
+      case _                      => fail()
+    }
+    watcher.expectMsg(WatchEndpoint(testKey, testLease.id.toString, false, 
watcherName, Set(DeleteEvent)))
+
+    watcher.expectMsg(UnwatchEndpoint(testKey, false, watcherName))
+    Thread.sleep(1500) //wait for the lease to be granted
+
+    service.stateName shouldBe Active
+    service.stateData shouldBe a[ActiveStates]
+    service.stateData match {
+      case ActiveStates(_, lease) => lease shouldBe newTestLease
+      case _                      => fail()
+    }
+    watcher.expectMsg(WatchEndpoint(newTestKey, newTestLease.id.toString, 
false, watcherName, Set(DeleteEvent)))
+  }
+
+}

Reply via email to