This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new a03507cee Support graceful shutdown. (#5283)
a03507cee is described below
commit a03507ceeb2c4648c875ed68273de1448ec0d074
Author: Dominic Kim <[email protected]>
AuthorDate: Tue Jul 19 15:01:36 2022 +0900
Support graceful shutdown. (#5283)
---
.../apache/openwhisk/core/containerpool/Container.scala | 14 ++++++++++++++
.../apache/openwhisk/core/entity/ActivationResult.scala | 9 +++++----
.../docker/test/DockerContainerTests.scala | 17 +++++++++++++++++
.../kubernetes/test/KubernetesContainerTests.scala | 16 ++++++++++++++++
4 files changed, 52 insertions(+), 4 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
index e3b5b5e96..6b557f3a3 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
@@ -135,6 +135,13 @@ trait Container {
transid.failed(this, start, s"initializiation failed with $t")
}
.flatMap { result =>
+ // if runtime container is shutting down, reschedule the activation
message
+ result.response.right.map { res =>
+ if (res.shuttingDown) {
+ throw ContainerHealthError(transid, containerId.asString)
+ }
+ }
+
if (result.ok) {
Future.successful(result.interval)
} else if (result.interval.duration >= timeout) {
@@ -180,6 +187,13 @@ trait Container {
transid.failed(this, start, s"run failed with $t")
}
.map { result =>
+ // if runtime container is shutting down, reschedule the activation
message
+ result.response.right.map { res =>
+ if (res.shuttingDown) {
+ throw ContainerHealthError(transid, containerId.asString)
+ }
+ }
+
val response = if (result.interval.duration >= timeout) {
ActivationResponse.developerError(Messages.timedoutActivation(timeout, false))
} else {
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala
index 6a737d61e..c8a1ddfb2 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala
@@ -18,12 +18,9 @@
package org.apache.openwhisk.core.entity
import scala.util.Try
-
-import akka.http.scaladsl.model.StatusCodes.OK
-
+import akka.http.scaladsl.model.StatusCodes.{OK, ServiceUnavailable}
import spray.json._
import spray.json.DefaultJsonProtocol
-
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.http.Messages._
@@ -139,6 +136,10 @@ protected[core] object ActivationResponse extends
DefaultJsonProtocol {
/** true iff status code is OK (HTTP 200 status code), anything else is
considered an error. **/
val okStatus = statusCode == OK.intValue
val ok = okStatus && truncated.isEmpty
+
+ /** true iff status code is ServiceUnavailable (HTTP 503 status code) */
+ val shuttingDown = statusCode == ServiceUnavailable.intValue
+
override def toString = {
val base = if (okStatus) "ok" else "not ok"
val rest = truncated.map(e => s", truncated ${e.toString}").getOrElse("")
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
index e70467a67..703c31189 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -474,6 +474,23 @@ class DockerContainerTests
end.deltaToMarkerStart shouldBe Some(interval.duration.toMillis)
}
+ it should "throw ContainerHealthError if runtime container returns 503
response" in {
+ implicit val docker = stub[DockerApiWithFileAccess]
+ implicit val runc = stub[RuncApi]
+
+ val interval = intervalOf(1.millisecond)
+ val result = JsObject.empty
+ val container = dockerContainer() {
+ Future.successful(RunResult(interval, Right(ContainerResponse(503,
result.compactPrint, None))))
+ }
+
+ val initResult = container.initialize(JsObject.empty, 1.second, 1)
+ an[ContainerHealthError] should be thrownBy await(initResult)
+
+ val runResult = container.run(JsObject.empty, JsObject.empty, 1.second, 1)
+ an[ContainerHealthError] should be thrownBy await(runResult)
+ }
+
it should "properly deal with a timeout during run" in {
implicit val docker = stub[DockerApiWithFileAccess]
implicit val runc = stub[RuncApi]
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index 0c0b5a093..b994cdd20 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -277,6 +277,22 @@ class KubernetesContainerTests
end.deltaToMarkerStart shouldBe Some(interval.duration.toMillis)
}
+ it should "throw ContainerHealthError if runtime container returns 503
response" in {
+ implicit val kubernetes = stub[KubernetesApi]
+
+ val interval = intervalOf(1.millisecond)
+ val result = JsObject.empty
+ val container = kubernetesContainer() {
+ Future.successful(RunResult(interval, Right(ContainerResponse(503,
result.compactPrint, None))))
+ }
+
+ val initResult = container.initialize(JsObject.empty, 1.second, 1)
+ an[ContainerHealthError] should be thrownBy await(initResult)
+
+ val runResult = container.run(JsObject.empty, JsObject.empty, 1.second, 1)
+ an[ContainerHealthError] should be thrownBy await(runResult)
+ }
+
it should "properly deal with a timeout during run" in {
implicit val kubernetes = stub[KubernetesApi]