diff --git a/common/scala/src/main/scala/whisk/common/Scheduler.scala
b/common/scala/src/main/scala/whisk/common/Scheduler.scala
index 28d150260e..bf73ad7d66 100644
--- a/common/scala/src/main/scala/whisk/common/Scheduler.scala
+++ b/common/scala/src/main/scala/whisk/common/Scheduler.scala
@@ -22,11 +22,7 @@ import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.Try
-
-import akka.actor.Actor
-import akka.actor.ActorSystem
-import akka.actor.Cancellable
-import akka.actor.Props
+import akka.actor.{Actor, ActorRef, ActorSystem, Cancellable, Props}
/**
* Scheduler utility functions to execute tasks in a repetitive way with
controllable behavior
@@ -99,7 +95,7 @@ object Scheduler {
name: String = "Scheduler")(f: () =>
Future[Any])(implicit system: ActorSystem,
logging: Logging,
transid: TransactionId =
-
TransactionId.unknown) = {
+
TransactionId.unknown): ActorRef = {
require(interval > Duration.Zero)
system.actorOf(Props(new Worker(initialDelay, interval, false, name, f)))
}
diff --git
a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
index 14af69e4a9..753c19a36d 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
@@ -60,6 +60,7 @@ object MessageFeed {
protected[connector] case object Idle extends FeedState
protected[connector] case object FillingPipeline extends FeedState
protected[connector] case object DrainingPipeline extends FeedState
+ protected[connector] case object GracefulShutdownDrain extends FeedState
protected sealed trait FeedData
private case object NoData extends FeedData
@@ -70,6 +71,10 @@ object MessageFeed {
/** Steady state message, indicates capacity in downstream process to
receive more messages. */
object Processed
+ object GracefulShutdown
+
+ object Busy
+
/** Indicates the fill operation has completed. */
private case class FillCompleted(messages: Seq[(String, Int, Long,
Array[Byte])])
}
@@ -117,6 +122,7 @@ class MessageFeed(description: String,
// Best practice dictates a mutable variable pointing at an immutable
collection for this reason
private var outstandingMessages = immutable.Queue.empty[(String, Int, Long,
Array[Byte])]
private var handlerCapacity = maximumHandlerCapacity
+ private var fillingQueue = false
private implicit val tid = TransactionId.dispatcher
@@ -126,6 +132,7 @@ class MessageFeed(description: String,
when(Idle) {
case Event(Ready, _) =>
+ fillingQueue = true
fillPipeline()
goto(FillingPipeline)
@@ -143,14 +150,19 @@ class MessageFeed(description: String,
case Event(FillCompleted(messages), _) =>
outstandingMessages = outstandingMessages ++ messages
sendOutstandingMessages()
+ fillingQueue = false
if (shouldFillQueue()) {
+ fillingQueue = true
fillPipeline()
stay
} else {
goto(DrainingPipeline)
}
+ case Event(GracefulShutdown, _) =>
+ goto(GracefulShutdownDrain)
+
case _ => stay
}
@@ -158,11 +170,33 @@ class MessageFeed(description: String,
case Event(Processed, _) =>
updateHandlerCapacity()
sendOutstandingMessages()
+
if (shouldFillQueue()) {
+ fillingQueue = true
fillPipeline()
goto(FillingPipeline)
} else stay
+ case Event(GracefulShutdown, _) =>
+ goto(GracefulShutdownDrain)
+
+ case _ => stay
+ }
+
+ when(GracefulShutdownDrain) {
+ case Event(Processed, _) =>
+ updateHandlerCapacity()
+ sendOutstandingMessages()
+ stay
+
+ case Event(FillCompleted(messages), _) =>
+ outstandingMessages = outstandingMessages ++ messages
+ sendOutstandingMessages()
+ stay
+
+ case Event(Busy, _) =>
+ stay() replying (handlerCapacity != maximumHandlerCapacity &&
outstandingMessages.nonEmpty && fillingQueue)
+
case _ => stay
}
@@ -207,7 +241,7 @@ class MessageFeed(description: String,
val occupancy = outstandingMessages.size
if (occupancy > 0 && handlerCapacity > 0) {
// Easiest way with an immutable queue to cleanly dequeue
- // Head is the first elemeent of the queue, desugared w/ an assignment
pattern
+ // Head is the first element of the queue, desugared w/ an assignment
pattern
// Tail is everything but the first element, thus mutating the
collection variable
val (topic, partition, offset, bytes) = outstandingMessages.head
outstandingMessages = outstandingMessages.tail
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index d26ebdc2f9..70a3b98855 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -139,7 +139,7 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
val retryLogDeadline = if (isErrorLogged) {
logging.error(
this,
- s"Rescheduling Run message, too many message in the pool,
freePoolSize: ${freePool.size}, " +
+ s"Rescheduling Run message, too many messages in the pool,
freePoolSize: ${freePool.size}, " +
s"busyPoolSize: ${busyPool.size}, maxActiveContainers
${poolConfig.maxActiveContainers}, " +
s"userNamespace: ${r.msg.user.namespace}, action:
${r.action}")(r.msg.transid)
Some(logMessageInterval.fromNow)
@@ -178,6 +178,9 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
case RescheduleJob =>
freePool = freePool - sender()
busyPool = busyPool - sender()
+
+ case Busy =>
+ sender ! (busyPool.nonEmpty)
}
/** Creates a new container and updates state accordingly. */
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 4a69d3cf99..32cdc4a06e 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -20,22 +20,18 @@ package whisk.core.invoker
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.Future
-import scala.util.Failure
import scala.util.Try
import kamon.Kamon
import org.apache.curator.retry.RetryUntilElapsed
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.shared.SharedCount
import akka.Done
-import akka.actor.ActorSystem
-import akka.actor.CoordinatedShutdown
+import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.stream.ActorMaterializer
import whisk.common.AkkaLogging
-import whisk.common.Scheduler
import whisk.core.WhiskConfig
import whisk.core.WhiskConfig._
import whisk.core.connector.MessagingProvider
-import whisk.core.connector.PingMessage
import whisk.core.entity._
import whisk.core.entity.ExecManifest
import whisk.core.entity.InstanceId
@@ -178,15 +174,10 @@ object Invoker {
case e: Exception => abort(s"Failed to initialize reactive invoker:
${e.getMessage}")
}
- Scheduler.scheduleWaitAtMost(1.seconds)(() => {
- producer.send("health", PingMessage(invokerInstance)).andThen {
- case Failure(t) => logger.error(this, s"failed to ping the controller:
$t")
- }
- })
-
val port = config.servicePort.toInt
BasicHttpService.startHttpService(new BasicRasService {}.route, port)(
actorSystem,
ActorMaterializer.create(actorSystem))
+
}
}
diff --git
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 20cbbd46cd..5b0335817f 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -20,12 +20,16 @@ package whisk.core.invoker
import java.nio.charset.StandardCharsets
import java.time.Instant
-import akka.actor.{ActorRefFactory, ActorSystem, Props}
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props}
import akka.event.Logging.InfoLevel
import akka.stream.ActorMaterializer
+import akka.pattern.{after, ask, gracefulStop}
+import akka.util.Timeout
+
import org.apache.kafka.common.errors.RecordTooLargeException
import pureconfig._
import spray.json._
+
import whisk.common._
import whisk.core.{ConfigKeys, WhiskConfig}
import whisk.core.connector._
@@ -36,10 +40,13 @@ import whisk.core.entity._
import whisk.http.Messages
import whisk.spi.SpiLoader
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
+import scala.language.postfixOps
+
import DefaultJsonProtocol._
+import sun.misc.{Signal, SignalHandler}
class InvokerReactive(
config: WhiskConfig,
@@ -76,7 +83,6 @@ class InvokerReactive(
"--ulimit" -> Set("nofile=1024:1024"),
"--pids-limit" -> Set("1024")) ++ logsProvider.containerParameters)
containerFactory.init()
- sys.addShutdownHook(containerFactory.cleanup())
/** Initialize needed databases */
private val entityStore = WhiskEntityStore.datastore()
@@ -284,4 +290,84 @@ class InvokerReactive(
})
}
+ val healthScheduler = Scheduler.scheduleWaitAtMost(1.seconds)(() => {
+ producer.send("health", PingMessage(instance)).andThen {
+ case Failure(t) => logging.error(this, s"failed to ping the
controller(s): $t")
+ }
+ })
+
+ /** Polls the pool's status and returns a future which completes once the
pool is idle. */
+ def waitForContainerPoolIdle(pool: ActorRef): Future[Unit] = {
+ implicit val timeout = Timeout(5 seconds)
+ val delay = 1.second
+
+ (pool ? Busy)
+ .mapTo[Boolean]
+ .flatMap {
+ case true =>
+ logging.info(this, "Container pool is not idle.")
+ after(delay, actorSystem.scheduler)(waitForContainerPoolIdle(pool))
+ case false =>
+ Future.successful(())
+ }
+ .recoverWith { case _ => after(delay,
actorSystem.scheduler)(waitForContainerPoolIdle(pool)) }
+ }
+
+ /** Polls the feed's status and returns a future which completes once the
feed is idle. */
+ def waitForActivationFeedIdle(feed: ActorRef): Future[Unit] = {
+ implicit val timeout = Timeout(5 seconds)
+ val delay = 1.second
+
+ activationFeed ! MessageFeed.GracefulShutdown
+ (feed ? MessageFeed.Busy)
+ .mapTo[Boolean]
+ .flatMap {
+ case true =>
+ logging.info(this, "Activation feed is not idle.")
+ after(delay, actorSystem.scheduler)(waitForActivationFeedIdle(feed))
+ case false =>
+ Future.successful(())
+ }
+ .recoverWith { case _ => after(delay,
actorSystem.scheduler)(waitForActivationFeedIdle(feed)) }
+ }
+
+ // Capture SIGTERM signals to gracefully shutdown the invoker. When
gracefully shutting down, the health scheduler is
+ // shutdown preventing additional actions from being scheduler to the
invoker, then the invoker processes its buffered
+ // messages from the activation feed, and waits for its user containers to
finish running before the process exits.
+ Signal.handle(new Signal("TERM"), new SignalHandler() {
+ override def handle(signal: Signal) = {
+ logging.info(this, s"Starting graceful shutdown")
+
+ // Order is important here so futures are ran sequentially
+ val shutdowns = for {
+ _ <- gracefulStop(healthScheduler, 5.seconds)
+ _ <- waitForActivationFeedIdle(activationFeed)
+ _ <- waitForContainerPoolIdle(pool)
+ } yield {
+ logging.info(this, "Successfully shutdown health scheduler, activation
feed, and container pool")
+ }
+
+ // Allow the shutdown to take a maximum of 3 times the maximum action
runtime since the feed can be
+ // buffered and we want to allow for some grace period. If a graceful
shutdown is not successful, the
+ // the invoker will continue running and a graceful shutdown can be
attempted again.
+ Await.result(shutdowns, TimeLimit.MAX_DURATION * 3)
+ containerFactory.cleanup()
+ logging.info(this, "Shutting down invoker")
+ System.exit(0)
+ }
+ })
+
+ // Capture SIGUSR2 signals to put the invoker into drain mode. When
draining, the health scheduler is shutdown
+ // preventing additional actions from being scheduled to the invoker
allowing the invoker to process its current
+ // queue.
+ Signal.handle(new Signal("USR2"), new SignalHandler() {
+ override def handle(signal: Signal) = {
+ logging.info(this, "Draining invoker")
+
+ gracefulStop(healthScheduler, 5.seconds).recover {
+ case _ => logging.info(this, "Health communication failed to shutdown
gracefully")
+ }
+ }
+ })
+
}
With regards,
Apache Git Services