markusthoemmes closed pull request #3710: Make REST communication with action
containers more robust
URL: https://github.com/apache/incubator-openwhisk/pull/3710
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
index e0fd37ff41..07c6fc7e5b 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
@@ -17,14 +17,15 @@
package whisk.core.containerpool
+import java.net.NoRouteToHostException
import java.nio.charset.StandardCharsets
-import scala.concurrent.duration.DurationInt
-import scala.concurrent.duration.FiniteDuration
+import scala.annotation.tailrec
+import scala.concurrent.duration._
+import scala.util.control.NoStackTrace
import scala.util.Failure
import scala.util.Success
import scala.util.Try
-
import org.apache.commons.io.IOUtils
import org.apache.http.HttpHeaders
import org.apache.http.client.config.RequestConfig
@@ -34,8 +35,9 @@ import org.apache.http.client.utils.URIBuilder
import org.apache.http.conn.HttpHostConnectException
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClientBuilder
-
import spray.json._
+import whisk.common.Logging
+import whisk.common.TransactionId
import whisk.core.entity.ActivationResponse._
import whisk.core.entity.ByteSize
import whisk.core.entity.size.SizeLong
@@ -52,7 +54,8 @@ import whisk.core.entity.size.SizeLong
* @param timeout the timeout in msecs to wait for a response
* @param maxResponse the maximum size in bytes the connection will accept
*/
-protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration,
maxResponse: ByteSize) {
+protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration,
maxResponse: ByteSize)(
+ implicit logging: Logging) {
def close() = Try(connection.close())
@@ -68,7 +71,8 @@ protected[core] class HttpUtils(hostname: String, timeout:
FiniteDuration, maxRe
* @param retry whether or not to retry on connection failure
* @return Left(Error Message) or Right(Status Code, Response as UTF-8
String)
*/
- def post(endpoint: String, body: JsValue, retry: Boolean):
Either[ContainerHttpError, ContainerResponse] = {
+ def post(endpoint: String, body: JsValue, retry: Boolean)(
+ implicit tid: TransactionId): Either[ContainerHttpError,
ContainerResponse] = {
val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8)
entity.setContentType("application/json")
@@ -76,12 +80,15 @@ protected[core] class HttpUtils(hostname: String, timeout:
FiniteDuration, maxRe
request.addHeader(HttpHeaders.ACCEPT, "application/json")
request.setEntity(entity)
- execute(request, timeout.toMillis.toInt, retry)
+ execute(request, timeout, retry)
}
- private def execute(request: HttpRequestBase,
- timeoutMsec: Integer,
- retry: Boolean): Either[ContainerHttpError,
ContainerResponse] = {
+ // Used internally to wrap all exceptions for which the request can be
retried
+ private case class RetryableConnectionError(t: Throwable) extends
Exception(t) with NoStackTrace
+
+ // Annotation will make the compiler complain if no tail recursion is
possible
+ @tailrec private def execute(request: HttpRequestBase, timeout:
FiniteDuration, retry: Boolean)(
+ implicit tid: TransactionId): Either[ContainerHttpError,
ContainerResponse] = {
Try(connection.execute(request)).map { response =>
val containerResponse = Option(response.getEntity)
.map { entity =>
@@ -105,15 +112,29 @@ protected[core] class HttpUtils(hostname: String,
timeout: FiniteDuration, maxRe
response.close()
containerResponse
+ } recoverWith {
+ // The route to target socket as well as the target socket itself may
need some time to be available -
+ // particularly on a loaded system.
+ // The following exceptions occur on such transient conditions. In
addition, no data has been transmitted
+ // yet if these exceptions occur. For this reason, it is safe and
reasonable to retry.
+ //
+ // HttpHostConnectException: no target socket is listening (yet).
+ case t: HttpHostConnectException => Failure(RetryableConnectionError(t))
+ //
+ // NoRouteToHostException: route to target host is not known (yet).
+ case t: NoRouteToHostException => Failure(RetryableConnectionError(t))
} match {
- case Success(r) => r
- case Failure(t: HttpHostConnectException) if retry =>
- if (timeoutMsec > 0) {
- Thread sleep 100
- val newTimeout = timeoutMsec - 100
- execute(request, newTimeout, retry)
+ case Success(response) => response
+ case Failure(t: RetryableConnectionError) if retry =>
+ val sleepTime = 10.milliseconds
+ if (timeout > Duration.Zero) {
+ logging.info(this, s"POST failed with ${t} - retrying after sleeping
${sleepTime}.")
+ Thread.sleep(sleepTime.toMillis)
+ val newTimeout = timeout - sleepTime
+ execute(request, newTimeout, retry = true)
} else {
- Left(Timeout())
+ logging.warn(this, s"POST failed with ${t} - no retry because
timeout exceeded.")
+ Left(Timeout(t))
}
case Failure(t: Throwable) => Left(ConnectionError(t))
}
@@ -141,14 +162,15 @@ protected[core] class HttpUtils(hostname: String,
timeout: FiniteDuration, maxRe
object HttpUtils {
/** A helper method to post one single request to a connection. Used for
container tests. */
- def post(host: String, port: Int, endPoint: String, content: JsValue): (Int,
Option[JsObject]) = {
+ def post(host: String, port: Int, endPoint: String, content:
JsValue)(implicit logging: Logging,
+ tid:
TransactionId): (Int, Option[JsObject]) = {
val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB)
val response = connection.post(endPoint, content, retry = true)
connection.close()
response match {
case Right(r) => (r.statusCode,
Try(r.entity.parseJson.asJsObject).toOption)
case Left(NoResponseReceived()) => throw new IllegalStateException("no
response from container")
- case Left(Timeout()) => throw new
java.util.concurrent.TimeoutException()
+ case Left(Timeout(_)) => throw new
java.util.concurrent.TimeoutException()
case Left(ConnectionError(t: java.net.SocketTimeoutException)) =>
throw new java.util.concurrent.TimeoutException()
case Left(ConnectionError(t)) => throw new
IllegalStateException(t.getMessage)
diff --git
a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
index 85a9f36015..5f8c815a7f 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
@@ -99,7 +99,7 @@ protected[core] object ActivationResponse extends
DefaultJsonProtocol {
protected[core] sealed trait ContainerHttpError extends
ContainerConnectionError
protected[core] case class ConnectionError(t: Throwable) extends
ContainerHttpError
protected[core] case class NoResponseReceived() extends ContainerHttpError
- protected[core] case class Timeout() extends ContainerHttpError
+ protected[core] case class Timeout(t: Throwable) extends ContainerHttpError
protected[core] case class MemoryExhausted() extends ContainerConnectionError
diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala
b/tests/src/test/scala/actionContainers/ActionContainer.scala
index 3ee1f47314..97915c8aad 100644
--- a/tests/src/test/scala/actionContainers/ActionContainer.scala
+++ b/tests/src/test/scala/actionContainers/ActionContainer.scala
@@ -31,13 +31,13 @@ import scala.sys.process.ProcessLogger
import scala.sys.process.stringToProcess
import scala.util.Random
import scala.util.{Failure, Success}
-
import org.apache.commons.lang3.StringUtils
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-
+import org.scalatest.{FlatSpec, Matchers}
import akka.actor.ActorSystem
import spray.json._
+import common.StreamLogging
+import whisk.common.Logging
+import whisk.common.TransactionId
import whisk.core.entity.Exec
/**
@@ -49,7 +49,7 @@ trait ActionContainer {
def run(value: JsValue): (Int, Option[JsObject])
}
-trait ActionProxyContainerTestUtils extends FlatSpec with Matchers {
+trait ActionProxyContainerTestUtils extends FlatSpec with Matchers with
StreamLogging {
import ActionContainer.{filterSentinel, sentinel}
def initPayload(code: String, main: String = "main"): JsObject =
@@ -149,8 +149,8 @@ object ActionContainer {
val sentinel = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
def filterSentinel(str: String): String = str.replaceAll(sentinel, "").trim
- def withContainer(imageName: String, environment: Map[String, String] =
Map.empty)(code: ActionContainer => Unit)(
- implicit actorSystem: ActorSystem): (String, String) = {
+ def withContainer(imageName: String, environment: Map[String, String] =
Map.empty)(
+ code: ActionContainer => Unit)(implicit actorSystem: ActorSystem, logging:
Logging): (String, String) = {
val rand = { val r = Random.nextInt; if (r < 0) -r else r }
val name = imageName.toLowerCase.replaceAll("""[^a-z]""", "") + rand
val envArgs = environment.toSeq
@@ -204,7 +204,11 @@ object ActionContainer {
}
}
- private def syncPost(host: String, port: Int, endPoint: String, content:
JsValue): (Int, Option[JsObject]) = {
+ private def syncPost(host: String, port: Int, endPoint: String, content:
JsValue)(
+ implicit logging: Logging): (Int, Option[JsObject]) = {
+
+ implicit val transid = TransactionId.testing
+
whisk.core.containerpool.HttpUtils.post(host, port, endPoint, content)
}
}
diff --git
a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
index 0e0867a25d..95754220b2 100644
---
a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala
@@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets
import java.time.Instant
import scala.concurrent.duration._
-
import org.apache.http.HttpRequest
import org.apache.http.HttpResponse
import org.apache.http.entity.StringEntity
@@ -34,8 +33,9 @@ import org.scalatest.BeforeAndAfter
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
-
import spray.json.JsObject
+import common.StreamLogging
+import whisk.common.TransactionId
import whisk.core.containerpool.HttpUtils
import whisk.core.entity.size._
import whisk.core.entity.ActivationResponse._
@@ -44,7 +44,14 @@ import whisk.core.entity.ActivationResponse._
* Unit tests for HttpUtils which communicate with containers.
*/
@RunWith(classOf[JUnitRunner])
-class ContainerConnectionTests extends FlatSpec with Matchers with
BeforeAndAfter with BeforeAndAfterAll {
+class ContainerConnectionTests
+ extends FlatSpec
+ with Matchers
+ with BeforeAndAfter
+ with BeforeAndAfterAll
+ with StreamLogging {
+
+ implicit val transid = TransactionId.testing
var testHang: FiniteDuration = 0.second
var testStatusCode: Int = 200
@@ -75,6 +82,7 @@ class ContainerConnectionTests extends FlatSpec with Matchers
with BeforeAndAfte
testHang = 0.second
testStatusCode = 200
testResponse = null
+ stream.reset()
}
override def afterAll = {
diff --git
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 25bc0c980c..b3aa2b0227 100644
---
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -383,7 +383,7 @@ class DockerContainerTests
val interval = intervalOf(initTimeout + 1.nanoseconds)
val container = dockerContainer() {
- Future.successful(RunResult(interval, Left(Timeout())))
+ Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
val init = container.initialize(JsObject(), initTimeout)
@@ -434,7 +434,7 @@ class DockerContainerTests
val interval = intervalOf(runTimeout + 1.nanoseconds)
val container = dockerContainer() {
- Future.successful(RunResult(interval, Left(Timeout())))
+ Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
val runResult = container.run(JsObject(), JsObject(), runTimeout)
diff --git
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index fc3ba6805f..a6e5d20f57 100644
---
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -235,7 +235,7 @@ class KubernetesContainerTests
val interval = intervalOf(initTimeout + 1.nanoseconds)
val container = kubernetesContainer() {
- Future.successful(RunResult(interval, Left(Timeout())))
+ Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
val init = container.initialize(JsObject(), initTimeout)
@@ -284,7 +284,7 @@ class KubernetesContainerTests
val interval = intervalOf(runTimeout + 1.nanoseconds)
val container = kubernetesContainer() {
- Future.successful(RunResult(interval, Left(Timeout())))
+ Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
val runResult = container.run(JsObject(), JsObject(), runTimeout)
diff --git
a/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
b/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
index 6e388773d1..78dd29642f 100644
--- a/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/ActivationResponseTests.scala
@@ -58,7 +58,7 @@ class ActivationResponseTests extends FlatSpec with Matchers {
}
it should "interpret failed init that does not response" in {
- Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout())
+ Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout(new
Throwable()))
.map(Left(_))
.foreach { e =>
val ar = processInitResponseContent(e, logger)
@@ -122,7 +122,7 @@ class ActivationResponseTests extends FlatSpec with
Matchers {
}
it should "interpret failed run that does not response" in {
- Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout())
+ Seq(ConnectionError(new Throwable()), NoResponseReceived(), Timeout(new
Throwable()))
.map(Left(_))
.foreach { e =>
val ar = processRunResponseContent(e, logger)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services