This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 0d22b99 Make REST communication with action containers more robust.
(#3710)
0d22b99 is described below
commit 0d22b9976e22bbe37bd77ba5afd3836870870d52
Author: Sven Lange-Last <[email protected]>
AuthorDate: Tue May 29 15:49:52 2018 +0200
Make REST communication with action containers more robust. (#3710)
On systems with high load, POSTing the action container `/init` endpoint
occasionally fails with `NoRouteToHostException`. Retry if this exception
occurs.
---
.../scala/whisk/core/containerpool/HttpUtils.scala | 60 +++++++++++++++-------
.../scala/whisk/core/entity/ActivationResult.scala | 2 +-
.../scala/actionContainers/ActionContainer.scala | 20 +++++---
.../docker/test/ContainerConnectionTests.scala | 14 +++--
.../docker/test/DockerContainerTests.scala | 4 +-
.../kubernetes/test/KubernetesContainerTests.scala | 4 +-
.../core/entity/test/ActivationResponseTests.scala | 4 +-
7 files changed, 71 insertions(+), 37 deletions(-)
diff --git
a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
index e0fd37f..07c6fc7 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
@@ -17,14 +17,15 @@
package whisk.core.containerpool
+import java.net.NoRouteToHostException
import java.nio.charset.StandardCharsets
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
+import scala.annotation.tailrec
+import scala.concurrent.duration._
+import scala.util.control.NoStackTrace
import scala.util.Failure
import scala.util.Success
import scala.util.Try
-
import org.apache.commons.io.IOUtils
import org.apache.http.HttpHeaders
import org.apache.http.client.config.RequestConfig
@@ -34,8 +35,9 @@ import org.apache.http.client.utils.URIBuilder
import org.apache.http.conn.HttpHostConnectException
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClientBuilder
-
import spray.json._
+import whisk.common.Logging
+import whisk.common.TransactionId
import whisk.core.entity.ActivationResponse._
import whisk.core.entity.ByteSize
import whisk.core.entity.size.SizeLong
@@ -52,7 +54,8 @@ import whisk.core.entity.size.SizeLong
* @param timeout the timeout in msecs to wait for a response
* @param maxResponse the maximum size in bytes the connection will accept
*/
-protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration,
maxResponse: ByteSize) {
+protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration,
maxResponse: ByteSize)(
+ implicit logging: Logging) {
def close() = Try(connection.close())
@@ -68,7 +71,8 @@ protected[core] class HttpUtils(hostname: String, timeout:
FiniteDuration, maxRe
* @param retry whether or not to retry on connection failure
* @return Left(Error Message) or Right(Status Code, Response as UTF-8
String)
*/
- def post(endpoint: String, body: JsValue, retry: Boolean):
Either[ContainerHttpError, ContainerResponse] = {
+ def post(endpoint: String, body: JsValue, retry: Boolean)(
+ implicit tid: TransactionId): Either[ContainerHttpError,
ContainerResponse] = {
val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8)
entity.setContentType("application/json")
@@ -76,12 +80,15 @@ protected[core] class HttpUtils(hostname: String, timeout:
FiniteDuration, maxRe
request.addHeader(HttpHeaders.ACCEPT, "application/json")
request.setEntity(entity)
- execute(request, timeout.toMillis.toInt, retry)
+ execute(request, timeout, retry)
}
- private def execute(request: HttpRequestBase,
- timeoutMsec: Integer,
- retry: Boolean): Either[ContainerHttpError,
ContainerResponse] = {
+ // Used internally to wrap all exceptions for which the request can be
retried
+ private case class RetryableConnectionError(t: Throwable) extends
Exception(t) with NoStackTrace
+
+ // Annotation will make the compiler complain if no tail recursion is
possible
+ @tailrec private def execute(request: HttpRequestBase, timeout:
FiniteDuration, retry: Boolean)(
+ implicit tid: TransactionId): Either[ContainerHttpError,
ContainerResponse] = {
Try(connection.execute(request)).map { response =>
val containerResponse = Option(response.getEntity)
.map { entity =>
@@ -105,15 +112,29 @@ protected[core] class HttpUtils(hostname: String,
timeout: FiniteDuration, maxRe
response.close()
containerResponse
+ } recoverWith {
+ // The route to target socket as well as the target socket itself may
need some time to be available -
+ // particularly on a loaded system.
+ // The following exceptions occur on such transient conditions. In
addition, no data has been transmitted
+ // yet if these exceptions occur. For this reason, it is safe and
reasonable to retry.
+ //
+ // HttpHostConnectException: no target socket is listening (yet).
+ case t: HttpHostConnectException => Failure(RetryableConnectionError(t))
+ //
+ // NoRouteToHostException: route to target host is not known (yet).
+ case t: NoRouteToHostException => Failure(RetryableConnectionError(t))
} match {
- case Success(r) => r
- case Failure(t: HttpHostConnectException) if retry =>
- if (timeoutMsec > 0) {
- Thread sleep 100
- val newTimeout = timeoutMsec - 100
- execute(request, newTimeout, retry)
+ case Success(response) => response
+ case Failure(t: RetryableConnectionError) if retry =>
+ val sleepTime = 10.milliseconds
+ if (timeout > Duration.Zero) {
+ logging.info(this, s"POST failed with ${t} - retrying after sleeping
${sleepTime}.")
+ Thread.sleep(sleepTime.toMillis)
+ val newTimeout = timeout - sleepTime
+ execute(request, newTimeout, retry = true)
} else {
- Left(Timeout())
+ logging.warn(this, s"POST failed with ${t} - no retry because
timeout exceeded.")
+ Left(Timeout(t))
}
case Failure(t: Throwable) => Left(ConnectionError(t))
}
@@ -141,14 +162,15 @@ protected[core] class HttpUtils(hostname: String,
timeout: FiniteDuration, maxRe
object HttpUtils {
/** A helper method to post one single request to a connection. Used for
container tests. */
- def post(host: String, port: Int, endPoint: String, content: JsValue): (Int,
Option[JsObject]) = {
+ def post(host: String, port: Int, endPoint: String, content:
JsValue)(implicit logging: Logging,
+ tid:
TransactionId): (Int, Option[JsObject]) = {
val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB)
val response = connection.post(endPoint, content, retry = true)
connection.close()
response match {
case Right(r) => (r.statusCode,
Try(r.entity.parseJson.asJsObject).toOption)
case Left(NoResponseReceived()) => throw new IllegalStateException("no
response from container")
- case Left(Timeout()) => throw new
java.util.concurrent.TimeoutException()
+ case Left(Timeout(_)) => throw new
java.util.concurrent.TimeoutException()
case Left(ConnectionError(t: java.net.SocketTimeoutException)) =>
throw new java.util.concurrent.TimeoutException()
case Left(ConnectionError(t)) => throw new
IllegalStateException(t.getMessage)
diff --git
a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
index 85a9f36..5f8c815 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
@@ -99,7 +99,7 @@ protected[core] object ActivationResponse extends
DefaultJsonProtocol {
protected[core] sealed trait ContainerHttpError extends
ContainerConnectionError
protected[core] case class ConnectionError(t: Throwable) extends
ContainerHttpError
protected[core] case class NoResponseReceived() extends ContainerHttpError
- protected[core] case class Timeout() extends ContainerHttpError
+ protected[core] case class Timeout(t: Throwable) extends ContainerHttpError
protected[core] case class MemoryExhausted() extends ContainerConnectionError
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala
b/tests/src/test/scala/actionContainers/ActionContainer.scala
index 3ee1f47..97915c8 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -31,13 +31,13 @@ import scala.sys.process.ProcessLogger
import scala.sys.process.stringToProcess
import scala.util.Random
import scala.util.{Failure, Success}
-
import org.apache.commons.lang3.StringUtils
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-
+import org.scalatest.{FlatSpec, Matchers}
import akka.actor.ActorSystem
import spray.json._
+import common.StreamLogging
+import whisk.common.Logging
+import whisk.common.TransactionId
import whisk.core.entity.Exec
/**
@@ -49,7 +49,7 @@ trait ActionContainer {
def run(value: JsValue): (Int, Option[JsObject])
}
-trait ActionProxyContainerTestUtils extends FlatSpec with Matchers {
+trait ActionProxyContainerTestUtils extends FlatSpec with Matchers with
StreamLogging {
import ActionContainer.{filterSentinel, sentinel}
def initPayload(code: String, main: String = "main"): JsObject =
@@ -149,8 +149,8 @@ object ActionContainer {
val sentinel = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
def filterSentinel(str: String): String = str.replaceAll(sentinel, "").trim
- def withContainer(imageName: String, environment: Map[String, String] =
Map.empty)(code: ActionContainer => Unit)(
- implicit actorSystem: ActorSystem): (String, String) = {
+ def withContainer(imageName: String, environment: Map[String, String] =
Map.empty)(
+ code: ActionContainer => Unit)(implicit actorSystem: ActorSystem, logging:
Logging): (String, String) = {
val rand = { val r = Random.nextInt; if (r < 0) -r else r }
val name = imageName.toLowerCase.replaceAll("""[^a-z]""", "") + rand
val envArgs = environment.toSeq
@@ -204,7 +204,11 @@ object ActionContainer {
}
}
- private def syncPost(host: String, port: Int, endPoint: String, content:
JsValue): (Int, Option[JsObject]) = {
+ private def syncPost(host: String, port: Int, endPoint: String, content:
JsValue)(
+ implicit logging: Logging): (Int, Option[JsObject]) = {
+
+ implicit val transid = TransactionId.testing
+
whisk.core.containerpool.HttpUtils.post(host, port, endPoint, content)
}
}
diff --git
a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
index 0e0867a..9575422 100644
---
a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
@@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets
import java.time.Instant
import scala.concurrent.duration._
-
import org.apache.http.HttpRequest
import org.apache.http.HttpResponse
import org.apache.http.entity.StringEntity
@@ -34,8 +33,9 @@ import org.scalatest.BeforeAndAfter
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
-
import spray.json.JsObject
+import common.StreamLogging
+import whisk.common.TransactionId
import whisk.core.containerpool.HttpUtils
import whisk.core.entity.size._
import whisk.core.entity.ActivationResponse._
@@ -44,7 +44,14 @@ import whisk.core.entity.ActivationResponse._
* Unit tests for HttpUtils which communicate with containers.
*/
@RunWith(classOf[JUnitRunner])
-class ContainerConnectionTests extends FlatSpec with Matchers with
BeforeAndAfter with BeforeAndAfterAll {
+class ContainerConnectionTests
+ extends FlatSpec
+ with Matchers
+ with BeforeAndAfter
+ with BeforeAndAfterAll
+ with StreamLogging {
+
+ implicit val transid = TransactionId.testing
var testHang: FiniteDuration = 0.second
var testStatusCode: Int = 200
@@ -75,6 +82,7 @@ class ContainerConnectionTests extends FlatSpec with Matchers
with BeforeAndAfte
testHang = 0.second
testStatusCode = 200
testResponse = null
+ stream.reset()
}
override def afterAll = {
diff --git
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 25bc0c9..b3aa2b0 100644
---
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -383,7 +383,7 @@ class DockerContainerTests
val interval = intervalOf(initTimeout + 1.nanoseconds)
val container = dockerContainer() {
- Future.successful(RunResult(interval, Left(Timeout())))
+ Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
val init = container.initialize(JsObject(), initTimeout)
@@ -434,7 +434,7 @@ class DockerContainerTests
val interval = intervalOf(runTimeout + 1.nanoseconds)
val container = dockerContainer() {
- Future.successful(RunResult(interval, Left(Timeout())))
+ Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
val runResult = container.run(JsObject(), JsObject(), runTimeout)
diff --git
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index fc3ba68..a6e5d20 100644
---
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -235,7 +235,7 @@ class KubernetesContainerTests
val interval = intervalOf(initTimeout + 1.nanoseconds)
val container = kubernetesContainer() {
- Future.successful(RunResult(interval, Left(Timeout())))
+ Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
val init = container.initialize(JsObject(), initTimeout)
@@ -284,7 +284,7 @@ class KubernetesContainerTests
val interval = intervalOf(runTimeout + 1.nanoseconds)
val container = kubernetesContainer() {
- Future.successful(RunResult(interval, Left(Timeout())))
+ Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
val runResult = container.run(JsObject(), JsObject(), runTimeout)
diff --git
a/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
b/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
index 6e38877..78dd296 100644
--- a/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
@@ -58,7 +58,7 @@ class ActivationResponseTests extends FlatSpec with Matchers {
}
it should "interpret failed init that does not response" in {
- Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout())
+ Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout(new
Throwable()))
.map(Left(_))
.foreach { e =>
val ar = processInitResponseContent(e, logger)
@@ -122,7 +122,7 @@ class ActivationResponseTests extends FlatSpec with
Matchers {
}
it should "interpret failed run that does not response" in {
- Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout())
+ Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout(new
Throwable()))
.map(Left(_))
.foreach { e =>
val ar = processRunResponseContent(e, logger)
--
To stop receiving notification emails like this one, please contact
[email protected].