This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new a03507cee Support graceful shutdown. (#5283)
a03507cee is described below

commit a03507ceeb2c4648c875ed68273de1448ec0d074
Author: Dominic Kim <[email protected]>
AuthorDate: Tue Jul 19 15:01:36 2022 +0900

    Support graceful shutdown. (#5283)
---
 .../apache/openwhisk/core/containerpool/Container.scala | 14 ++++++++++++++
 .../apache/openwhisk/core/entity/ActivationResult.scala |  9 +++++----
 .../docker/test/DockerContainerTests.scala              | 17 +++++++++++++++++
 .../kubernetes/test/KubernetesContainerTests.scala      | 16 ++++++++++++++++
 4 files changed, 52 insertions(+), 4 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
index e3b5b5e96..6b557f3a3 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
@@ -135,6 +135,13 @@ trait Container {
           transid.failed(this, start, s"initializiation failed with $t")
       }
       .flatMap { result =>
+        // if runtime container is shutting down, reschedule the activation 
message
+        result.response.right.map { res =>
+          if (res.shuttingDown) {
+            throw ContainerHealthError(transid, containerId.asString)
+          }
+        }
+
         if (result.ok) {
           Future.successful(result.interval)
         } else if (result.interval.duration >= timeout) {
@@ -180,6 +187,13 @@ trait Container {
           transid.failed(this, start, s"run failed with $t")
       }
       .map { result =>
+        // if runtime container is shutting down, reschedule the activation 
message
+        result.response.right.map { res =>
+          if (res.shuttingDown) {
+            throw ContainerHealthError(transid, containerId.asString)
+          }
+        }
+
         val response = if (result.interval.duration >= timeout) {
           
ActivationResponse.developerError(Messages.timedoutActivation(timeout, false))
         } else {
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala
index 6a737d61e..c8a1ddfb2 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala
@@ -18,12 +18,9 @@
 package org.apache.openwhisk.core.entity
 
 import scala.util.Try
-
-import akka.http.scaladsl.model.StatusCodes.OK
-
+import akka.http.scaladsl.model.StatusCodes.{OK, ServiceUnavailable}
 import spray.json._
 import spray.json.DefaultJsonProtocol
-
 import org.apache.openwhisk.common.Logging
 import org.apache.openwhisk.http.Messages._
 
@@ -139,6 +136,10 @@ protected[core] object ActivationResponse extends 
DefaultJsonProtocol {
     /** true iff status code is OK (HTTP 200 status code), anything else is 
considered an error. **/
     val okStatus = statusCode == OK.intValue
     val ok = okStatus && truncated.isEmpty
+
+    /** true iff status code is ServiceUnavailable (HTTP 503 status code) */
+    val shuttingDown = statusCode == ServiceUnavailable.intValue
+
     override def toString = {
       val base = if (okStatus) "ok" else "not ok"
       val rest = truncated.map(e => s", truncated ${e.toString}").getOrElse("")
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
index e70467a67..703c31189 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -474,6 +474,23 @@ class DockerContainerTests
     end.deltaToMarkerStart shouldBe Some(interval.duration.toMillis)
   }
 
+  it should "throw ContainerHealthError if runtime container returns 503 
response" in {
+    implicit val docker = stub[DockerApiWithFileAccess]
+    implicit val runc = stub[RuncApi]
+
+    val interval = intervalOf(1.millisecond)
+    val result = JsObject.empty
+    val container = dockerContainer() {
+      Future.successful(RunResult(interval, Right(ContainerResponse(503, 
result.compactPrint, None))))
+    }
+
+    val initResult = container.initialize(JsObject.empty, 1.second, 1)
+    an[ContainerHealthError] should be thrownBy await(initResult)
+
+    val runResult = container.run(JsObject.empty, JsObject.empty, 1.second, 1)
+    an[ContainerHealthError] should be thrownBy await(runResult)
+  }
+
   it should "properly deal with a timeout during run" in {
     implicit val docker = stub[DockerApiWithFileAccess]
     implicit val runc = stub[RuncApi]
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index 0c0b5a093..b994cdd20 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -277,6 +277,22 @@ class KubernetesContainerTests
     end.deltaToMarkerStart shouldBe Some(interval.duration.toMillis)
   }
 
+  it should "throw ContainerHealthError if runtime container returns 503 
response" in {
+    implicit val kubernetes = stub[KubernetesApi]
+
+    val interval = intervalOf(1.millisecond)
+    val result = JsObject.empty
+    val container = kubernetesContainer() {
+      Future.successful(RunResult(interval, Right(ContainerResponse(503, 
result.compactPrint, None))))
+    }
+
+    val initResult = container.initialize(JsObject.empty, 1.second, 1)
+    an[ContainerHealthError] should be thrownBy await(initResult)
+
+    val runResult = container.run(JsObject.empty, JsObject.empty, 1.second, 1)
+    an[ContainerHealthError] should be thrownBy await(runResult)
+  }
+
   it should "properly deal with a timeout during run" in {
     implicit val kubernetes = stub[KubernetesApi]
 

Reply via email to