This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new e05aa44 [New Scheduler] Implement KeepAliveService (#5067)
e05aa44 is described below
commit e05aa44b0cab519c82cf84a8171671a21d779562
Author: 김건희 <[email protected]>
AuthorDate: Thu Mar 11 10:07:02 2021 +0900
[New Scheduler] Implement KeepAliveService (#5067)
* Add KeepAliveService
* Add KeepAliveService to Scheduler
* Update name for case class
* Include workers in state data to thread safe
---
.../org/apache/openwhisk/common/Logging.scala | 4 +
.../org/apache/openwhisk/core/WhiskConfig.scala | 3 +
.../core/service/LeaseKeepAliveService.scala | 179 ++++++++++++++++
.../openwhisk/core/scheduler/Scheduler.scala | 17 +-
.../core/service/LeaseKeepAliveServiceTests.scala | 232 +++++++++++++++++++++
5 files changed, 428 insertions(+), 7 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 9efc977..885b2a2 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -338,6 +338,7 @@ object LoggingMarkers {
val timeout = "timeout"
private val controller = "controller"
+ private val scheduler = "scheduler"
private val invoker = "invoker"
private val database = "database"
private val activation = "activation"
@@ -555,6 +556,9 @@ object LoggingMarkers {
LogMarkerToken(kafka, "topic", start, Some("delay"), Map("topic" ->
topic))(MeasurementUnit.time.milliseconds)
else LogMarkerToken(kafka, topic, start,
Some("delay"))(MeasurementUnit.time.milliseconds)
+ def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
+ LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" ->
leaseId.toString))(MeasurementUnit.none)
+
/*
* General markers
*/
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index ba15f5f..67cf783 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -266,6 +266,7 @@ object ConfigKeys {
val controllerActivation = s"$controller.activation"
val etcd = "whisk.etcd"
+ val etcdLeaseTimeout = "whisk.etcd.lease.timeout"
val etcdPoolThreads = "whisk.etcd.pool.threads"
val activationStore = "whisk.activation-store"
@@ -290,5 +291,7 @@ object ConfigKeys {
val azBlob = "whisk.azure-blob"
+ val schedulerMaxPeek = "whisk.scheduler.max-peek"
+
val whiskClusterName = "whisk.cluster.name"
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/service/LeaseKeepAliveService.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/service/LeaseKeepAliveService.scala
new file mode 100644
index 0000000..c29b878
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/service/LeaseKeepAliveService.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.service
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
+import akka.pattern.pipe
+import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter}
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.entity.InstanceId
+import org.apache.openwhisk.core.etcd.EtcdClient
+import org.apache.openwhisk.core.etcd.EtcdKV.InstanceKeys.instanceLease
+import pureconfig.loadConfigOrThrow
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.util.{Failure, Success}
+
+// States
+sealed trait KeepAliveServiceState
+case object Ready extends KeepAliveServiceState
+case object Active extends KeepAliveServiceState
+
+// Data
+sealed trait KeepAliveServiceData
+case object NoData extends KeepAliveServiceData
+case class Lease(id: Long, ttl: Long) extends KeepAliveServiceData
+case class ActiveStates(worker: Cancellable, lease: Lease) extends
KeepAliveServiceData
+
+// Events received by the actor
+case object RegrantLease
+case object GetLease
+case object GrantLease
+
+// Events internally used
+case class SetLease(lease: Lease)
+case class SetWatcher(worker: Cancellable)
+
+class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId,
watcherService: ActorRef)(
+ implicit logging: Logging,
+ actorSystem: ActorSystem)
+ extends FSM[KeepAliveServiceState, KeepAliveServiceData]
+ with Stash {
+
+ implicit val ec: ExecutionContextExecutor = context.dispatcher
+
+ private val leaseTimeout =
loadConfigOrThrow[Int](ConfigKeys.etcdLeaseTimeout).seconds
+ private val key = instanceLease(instanceId)
+ private val watcherName = "lease-service"
+
+ self ! GrantLease
+ startWith(Ready, NoData)
+
+ when(Ready) {
+ case Event(GrantLease, NoData) =>
+ etcdClient
+ .grant(leaseTimeout.toSeconds)
+ .map { res =>
+ SetLease(Lease(res.getID, res.getTTL))
+ }
+ .pipeTo(self)
+ stay
+
+ case Event(SetLease(lease), NoData) =>
+ startKeepAliveService(lease)
+ .pipeTo(self)
+ logging.info(this, s"Granted a new lease $lease")
+ stay using lease
+
+ case Event(SetWatcher(w), l: Lease) =>
+ goto(Active) using ActiveStates(w, l)
+
+ case Event(t: FailureMessage, _) =>
+ logging.warn(this, s"Failed to grant new lease caused by: $t")
+ self ! GrantLease
+ stay()
+
+ case _ => delay
+ }
+
+ when(Active) {
+ case Event(WatchEndpointRemoved(`key`, `key`, _, false),
ActiveStates(worker, lease)) =>
+ logging.info(this, s"endpoint ie removed so recreate a lease")
+ recreateLease(worker, lease)
+
+ case Event(RegrantLease, ActiveStates(worker, lease)) =>
+ logging.info(this, s"ReGrant a lease, old lease:${lease}")
+ recreateLease(worker, lease)
+
+ case Event(GetLease, ActiveStates(_, lease)) =>
+ logging.info(this, s"send the lease(${lease}) to ${sender()}")
+ sender() ! lease
+ stay()
+
+ case _ => delay
+ }
+
+ initialize()
+
+ private def startKeepAliveService(lease: Lease): Future[SetWatcher] = {
+ val worker =
+ actorSystem.scheduler.schedule(initialDelay = 0.second, interval =
500.milliseconds)(keepAliveOnce(lease))
+
+ /**
+ * To verify that lease has been deleted since timeout,
+ * create a key using lease, watch the key, and receive an event for
deletion.
+ */
+ etcdClient.put(key, s"${lease.id}", lease.id).map { _ =>
+ watcherService ! WatchEndpoint(key, s"${lease.id}", false, watcherName,
Set(DeleteEvent))
+ SetWatcher(worker)
+ }
+ }
+
+ private def keepAliveOnce(lease: Lease): Future[Long] = {
+ etcdClient
+ .keepAliveOnce(lease.id)
+ .map(_.getID)
+ .andThen {
+ case Success(_) =>
MetricEmitter.emitCounterMetric(LoggingMarkers.SCHEDULER_KEEP_ALIVE(lease.id))
+ case Failure(t) =>
+ logging.warn(this, s"Failed to keep-alive of ${lease.id} caused by
${t}")
+ self ! RegrantLease
+ }
+ }
+
+ private def recreateLease(worker: Cancellable, lease: Lease) = {
+ logging.info(this, s"recreate a lease, old lease: $lease")
+ worker.cancel() // stop scheduler
+ watcherService ! UnwatchEndpoint(key, false, watcherName) // stop watcher
+ etcdClient
+ .revoke(lease.id) // delete lease
+ .onComplete(_ => self ! GrantLease) // create lease
+ goto(Ready) using NoData
+ }
+
+ // Unstash all messages stashed while in intermediate state
+ onTransition {
+ case _ -> Ready => unstashAll()
+ case _ -> Active => unstashAll()
+ }
+
+ /** Delays all incoming messages until unstashAll() is called */
+ def delay = {
+ stash()
+ stay
+ }
+
+ override def postStop(): Unit = {
+ stateData match {
+ case ActiveStates(w, _) => w.cancel() // stop scheduler if that exist
+ case _ => // do nothing
+ }
+ watcherService ! UnwatchEndpoint(key, false, watcherName)
+ }
+}
+
+object LeaseKeepAliveService {
+ def props(etcdClient: EtcdClient, instanceId: InstanceId, watcherService:
ActorRef)(
+ implicit logging: Logging,
+ actorSystem: ActorSystem): Props = {
+ Props(new LeaseKeepAliveService(etcdClient, instanceId, watcherService))
+ .withDispatcher("dispatchers.lease-service-dispatcher")
+ }
+}
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index 6bb4311..d9bb08d 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -18,19 +18,21 @@
package org.apache.openwhisk.core.scheduler
import akka.Done
-import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem,
CoordinatedShutdown}
+import akka.actor.{ActorRef, ActorRefFactory, ActorSelection, ActorSystem,
CoordinatedShutdown}
import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.typesafe.config.ConfigValueFactory
import kamon.Kamon
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common._
-import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.database.{ActivationStoreProvider,
NoDocumentException, UserContext}
import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.service.{LeaseKeepAliveService,
WatcherService}
import org.apache.openwhisk.http.BasicHttpService
import org.apache.openwhisk.spi.SpiLoader
import org.apache.openwhisk.utils.ExecutionContextFactory
@@ -55,10 +57,11 @@ class Scheduler(schedulerId: SchedulerInstanceId,
schedulerEndpoints: SchedulerE
val msgProvider = SpiLoader.get[MessagingProvider]
val producer = msgProvider.getProducer(config,
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
- val maxPeek = "" // TODO: TBD
- val etcdClient = "" // TODO: TBD
- val watcherService = "" // TODO: TBD
- val leaseService = "" // TODO: TBD
+ val maxPeek = loadConfigOrThrow[Int](ConfigKeys.schedulerMaxPeek)
+ val etcdClient =
EtcdClient(loadConfigOrThrow[EtcdConfig](ConfigKeys.etcd).hosts)
+ val watcherService: ActorRef =
actorSystem.actorOf(WatcherService.props(etcdClient))
+ val leaseService =
+ actorSystem.actorOf(LeaseKeepAliveService.props(etcdClient, schedulerId,
watcherService))
implicit val entityStore = WhiskEntityStore.datastore()
private val activationStore =
@@ -139,7 +142,7 @@ class Scheduler(schedulerId: SchedulerInstanceId,
schedulerEndpoints: SchedulerE
config,
s"scheduler${schedulerId.asString}",
s"scheduler${schedulerId.asString}",
- 500, // TODO: to be updated with maxPeek variable
+ maxPeek,
maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
implicit val trasnid = TransactionId.containerCreation
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/service/LeaseKeepAliveServiceTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/service/LeaseKeepAliveServiceTests.scala
new file mode 100644
index 0000000..2a7f1b6
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/service/LeaseKeepAliveServiceTests.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.service
+
+import akka.actor.ActorSystem
+import akka.pattern.ask
+import akka.testkit.{ImplicitSender, TestFSMRef, TestKit, TestProbe}
+import akka.util.Timeout
+import com.ibm.etcd.api.{LeaseGrantResponse, LeaseKeepAliveResponse,
LeaseRevokeResponse, PutResponse}
+import common.StreamLogging
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.entity.{ExecManifest, SchedulerInstanceId}
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdKV}
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+
+@RunWith(classOf[JUnitRunner])
+class LeaseKeepAliveServiceTests
+ extends TestKit(ActorSystem("LeaseKeepAliveService"))
+ with ImplicitSender
+ with FlatSpecLike
+ with ScalaFutures
+ with Matchers
+ with MockFactory
+ with BeforeAndAfterAll
+ with StreamLogging {
+
+ implicit val timeout: Timeout = Timeout(5.seconds)
+ implicit val ec: ExecutionContext = system.dispatcher
+ val config = new WhiskConfig(ExecManifest.requiredProperties)
+ val testInstanceId = SchedulerInstanceId("0")
+ val testLeaseId = 10
+ val newTestLeaseId = 20
+ val testTtl = 1
+ val testLease = Lease(testLeaseId, testTtl)
+ val newTestLease = Lease(newTestLeaseId, testTtl)
+ val testKey = EtcdKV.InstanceKeys.instanceLease(testInstanceId)
+ val newTestKey = EtcdKV.InstanceKeys.instanceLease(testInstanceId)
+
+ val watcherName = "lease-service"
+
+ def grant(etcd: EtcdClient): Unit = {
+ (etcd
+ .grant(_: Long))
+ .expects(*)
+
.returning(Future.successful(LeaseGrantResponse.newBuilder().setID(testLeaseId).setTTL(testTtl).build()))
+ }
+
+ def put(etcd: EtcdClient): Unit = {
+ (etcd
+ .put(_: String, _: String, _: Long))
+ .expects(testKey, *, *)
+ .returning(Future.successful(PutResponse.newBuilder().build()))
+ }
+
+ def keepAliveOnce(etcd: EtcdClient): Unit = {
+ (etcd
+ .keepAliveOnce(_: Long))
+ .expects(testLeaseId)
+
.returning(Future.successful(LeaseKeepAliveResponse.newBuilder().setID(testLeaseId).build()))
+ .anyNumberOfTimes()
+ }
+
+ override def afterAll(): Unit = {
+ TestKit.shutdownActorSystem(system)
+ super.afterAll()
+ }
+
+ behavior of "LeaseKeepAliveService"
+
+ it should "grant new lease" in {
+
+ val mockEtcd = mock[EtcdClient]
+ grant(mockEtcd)
+ put(mockEtcd)
+ keepAliveOnce(mockEtcd)
+
+ val watcher = TestProbe()
+ val service = TestFSMRef(new LeaseKeepAliveService(mockEtcd,
testInstanceId, watcher.ref))
+
+ Thread.sleep(1000)
+ service.stateName shouldBe Active
+ service.stateData shouldBe a[ActiveStates]
+ service.stateData match {
+ case ActiveStates(_, lease) => lease shouldBe testLease
+ case _ => fail()
+ }
+ watcher.expectMsg(WatchEndpoint(testKey, testLease.id.toString, false,
watcherName, Set(DeleteEvent)))
+
+ }
+
+ it should "regrant a new lease while old lease is deleted" in {
+ val mockEtcd = mock[EtcdClient]
+ grant(mockEtcd)
+ put(mockEtcd)
+ keepAliveOnce(mockEtcd)
+
+ (mockEtcd
+ .revoke(_: Long))
+ .expects(testLeaseId)
+ .returning(Future.successful(LeaseRevokeResponse.newBuilder().build()))
+ (mockEtcd
+ .grant(_: Long))
+ .expects(*)
+
.returning(Future.successful(LeaseGrantResponse.newBuilder().setID(newTestLeaseId).setTTL(testTtl).build()))
+ (mockEtcd
+ .put(_: String, _: String, _: Long))
+ .expects(newTestKey, *, *)
+ .returning(Future.successful(PutResponse.newBuilder().build()))
+ (mockEtcd
+ .keepAliveOnce(_: Long))
+ .expects(newTestLeaseId)
+
.returning(Future.successful(LeaseKeepAliveResponse.newBuilder().setID(newTestLeaseId).build()))
+ .anyNumberOfTimes()
+
+ val watcher = TestProbe()
+ val service = TestFSMRef(new LeaseKeepAliveService(mockEtcd,
testInstanceId, watcher.ref))
+
+ service.stateName shouldBe Active
+ service.stateData shouldBe a[ActiveStates]
+ service.stateData match {
+ case ActiveStates(_, lease) => lease shouldBe testLease
+ case _ => fail()
+ }
+ watcher.expectMsg(WatchEndpoint(testKey, testLease.id.toString, false,
watcherName, Set(DeleteEvent)))
+
+ service ! WatchEndpointRemoved(testKey, testKey, testLease.id.toString,
false)
+
+ watcher.expectMsg(UnwatchEndpoint(testKey, false, watcherName))
+ Thread.sleep(500) //wait for the lease to be granted
+
+ service.stateName shouldBe Active
+ service.stateData shouldBe a[ActiveStates]
+ service.stateData match {
+ case ActiveStates(_, lease) => lease shouldBe newTestLease
+ case _ => fail()
+ }
+ watcher.expectMsg(WatchEndpoint(newTestKey, newTestLease.id.toString,
false, watcherName, Set(DeleteEvent)))
+ }
+
+ it should "get lease" in {
+ val mockEtcd = mock[EtcdClient]
+ grant(mockEtcd)
+ put(mockEtcd)
+ keepAliveOnce(mockEtcd)
+
+ val service = TestFSMRef(new LeaseKeepAliveService(mockEtcd,
testInstanceId, TestProbe().ref))
+
+ (service ? GetLease).mapTo[Lease].futureValue shouldBe testLease
+ }
+
+ it should "regrant a new lease when keepalive is failed" in {
+ val mockEtcd = mock[EtcdClient]
+ grant(mockEtcd)
+ put(mockEtcd)
+
+ (mockEtcd
+ .keepAliveOnce(_: Long))
+ .expects(testLeaseId)
+
.returning(Future.successful(LeaseKeepAliveResponse.newBuilder().setID(testLeaseId).build()))
+ .noMoreThanTwice()
+
+ (mockEtcd
+ .keepAliveOnce(_: Long))
+ .expects(testLeaseId)
+ .returning(Future.failed(new RuntimeException("failed to keep alive the
lease")))
+ .noMoreThanOnce()
+
+ (mockEtcd
+ .revoke(_: Long))
+ .expects(testLeaseId)
+ .returning(Future.successful(LeaseRevokeResponse.newBuilder().build()))
+
+ (mockEtcd
+ .keepAliveOnce(_: Long))
+ .expects(newTestLeaseId)
+
.returning(Future.successful(LeaseKeepAliveResponse.newBuilder().setID(newTestLeaseId).build()))
+ .anyNumberOfTimes()
+
+ (mockEtcd
+ .grant(_: Long))
+ .expects(*)
+
.returning(Future.successful(LeaseGrantResponse.newBuilder().setID(newTestLeaseId).setTTL(testTtl).build()))
+ (mockEtcd
+ .put(_: String, _: String, _: Long))
+ .expects(newTestKey, *, *)
+ .returning(Future.successful(PutResponse.newBuilder().build()))
+
+ val watcher = TestProbe()
+ val service = TestFSMRef(new LeaseKeepAliveService(mockEtcd,
testInstanceId, watcher.ref))
+ service.stateName shouldBe Active
+ service.stateData shouldBe a[ActiveStates]
+ service.stateData match {
+ case ActiveStates(_, lease) => lease shouldBe testLease
+ case _ => fail()
+ }
+ watcher.expectMsg(WatchEndpoint(testKey, testLease.id.toString, false,
watcherName, Set(DeleteEvent)))
+
+ watcher.expectMsg(UnwatchEndpoint(testKey, false, watcherName))
+ Thread.sleep(1500) //wait for the lease to be granted
+
+ service.stateName shouldBe Active
+ service.stateData shouldBe a[ActiveStates]
+ service.stateData match {
+ case ActiveStates(_, lease) => lease shouldBe newTestLease
+ case _ => fail()
+ }
+ watcher.expectMsg(WatchEndpoint(newTestKey, newTestLease.id.toString,
false, watcherName, Set(DeleteEvent)))
+ }
+
+}