chetanmeh commented on a change in pull request #3704: Invoker graceful 
shutdown and drain mode
URL: 
https://github.com/apache/incubator-openwhisk/pull/3704#discussion_r194017675
 
 

 ##########
 File path: core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
 ##########
 @@ -284,4 +290,93 @@ class InvokerReactive(
       })
   }
 
+  val healthScheduler = Scheduler.scheduleWaitAtMost(1.seconds)(() => {
+    producer.send("health", PingMessage(instance)).andThen {
+      case Failure(t) => logging.error(this, s"failed to ping the 
controller(s): $t")
+    }
+  })
+
+  /** Polls the pool's status and returns a future which completes once the 
pool is idle. */
+  def waitForContainerPoolIdle(pool: ActorRef): Future[Unit] = {
+    implicit val timeout = Timeout(5 seconds)
+    val delay = 1.second
+
+    (pool ? Busy)
+      .mapTo[Boolean]
+      .flatMap {
+        case true =>
+          logging.info(this, "Container pool is not idle.")
+          after(delay, actorSystem.scheduler)(waitForContainerPoolIdle(pool))
+        case false =>
+          Future.successful(())
+      }
+      .recoverWith { case _ => after(delay, 
actorSystem.scheduler)(waitForContainerPoolIdle(pool)) }
+  }
+
+  /** Polls the feed's status and returns a future which completes once the 
feed is idle. */
+  def waitForActivationFeedIdle(feed: ActorRef): Future[Unit] = {
+    implicit val timeout = Timeout(5 seconds)
+    val delay = 1.second
+
+    activationFeed ! MessageFeed.GracefulShutdown
+    (feed ? MessageFeed.Busy)
+      .mapTo[Boolean]
+      .flatMap {
+        case true =>
+          logging.info(this, "Activation feed is not idle.")
+          after(delay, actorSystem.scheduler)(waitForActivationFeedIdle(feed))
+        case false =>
+          Future.successful(())
+      }
+      .recoverWith { case _ => after(delay, 
actorSystem.scheduler)(waitForActivationFeedIdle(feed)) }
+  }
+
+  // Capture SIGTERM signals to gracefully shutdown the invoker. When 
gracefully shutting down, the health scheduler is
+  // shutdown preventing additional actions from being scheduler to the 
invoker, then the invoker processes its buffered
+  // messages from the activation feed, and waits for its user containers to 
finish running before the process exits.
+  Signal.handle(
+    new Signal("TERM"),
+    new SignalHandler() {
+      override def handle(signal: Signal) = {
+        logging.info(this, s"Starting graceful shutdown")
+
+        // Order is important here so futures are ran sequentially
+        val shutdowns = for {
+          _ <- gracefulStop(healthScheduler, 5.seconds).recover {
+            case _ => logging.info(this, "Health communication failed to 
shutdown gracefully")
+          }
+          _ <- waitForActivationFeedIdle(activationFeed)
+          _ <- waitForContainerPoolIdle(pool)
+          _ <- gracefulStop(activationFeed, 5.seconds).recover {
+            case _ => logging.info(this, "Activation feed failed to shutdown 
gracefully")
+          }
+        } yield {
+          logging.info(this, "Successfully shutdown health scheduler, 
activation feed, and container pool")
+        }
+
+        try {
+          // Allow the shutdown to take a maximum of 3 times the maximum 
action runtime since the feed can be
+          // buffered and we want to allow for some grace period.
+          Await.result(shutdowns, TimeLimit.MAX_DURATION * 3)
 
 Review comment:
   May be we return a different exit code in case of unclean shutdown

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to