rabbah closed pull request #3843: Properly consume and optimize reading of http
entities, drop connection reuse.
URL: https://github.com/apache/incubator-openwhisk/pull/3843
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 02e94531b9..0983fd899c 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -51,7 +51,7 @@ dependencies {
compile 'commons-io:commons-io:2.6'
compile 'commons-collections:commons-collections:3.2.2'
compile 'org.apache.kafka:kafka-clients:0.11.0.1'
- compile ('org.apache.httpcomponents:httpclient:4.4.1') {
+ compile ('org.apache.httpcomponents:httpclient:4.5.5') {
exclude group: 'commons-logging'
}
compile ('com.fasterxml.uuid:java-uuid-generator:3.1.3') {
diff --git
a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
index bf95441c0f..4ee7363029 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
@@ -20,32 +20,29 @@ package whisk.core.containerpool
import java.net.NoRouteToHostException
import java.nio.charset.StandardCharsets
-import scala.concurrent.duration._
-import scala.annotation.tailrec
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.util.control.NoStackTrace
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
import org.apache.commons.io.IOUtils
import org.apache.http.HttpHeaders
import org.apache.http.client.config.RequestConfig
-import org.apache.http.client.methods.HttpPost
-import org.apache.http.client.methods.HttpRequestBase
-import org.apache.http.client.utils.URIBuilder
+import org.apache.http.client.methods.{HttpPost, HttpRequestBase}
+import org.apache.http.client.utils.{HttpClientUtils, URIBuilder}
import org.apache.http.conn.HttpHostConnectException
import org.apache.http.entity.StringEntity
+import org.apache.http.impl.NoConnectionReuseStrategy
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
+import org.apache.http.util.EntityUtils
import spray.json._
-import whisk.common.Logging
-import whisk.common.TransactionId
+import whisk.common.{Logging, TransactionId}
import whisk.core.entity.ActivationResponse._
import whisk.core.entity.ByteSize
import whisk.core.entity.size.SizeLong
+import scala.annotation.tailrec
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NoStackTrace
+
/**
* This HTTP client is used only in the invoker to communicate with the action
container.
* It allows to POST a JSON object and receive JSON object back; that is the
@@ -68,7 +65,7 @@ protected class HttpUtils(hostname: String, timeout:
FiniteDuration, maxResponse
* This will close the HttpClient that is generated for this instance of
HttpUtils. That will also cause the
* ConnectionManager to be closed alongside.
*/
- def close() = Try(connection.close())
+ def close(): Unit = HttpClientUtils.closeQuietly(connection)
/**
* Posts to hostname/endpoint the given JSON object.
@@ -106,13 +103,20 @@ protected class HttpUtils(hostname: String, timeout:
FiniteDuration, maxResponse
val statusCode = response.getStatusLine.getStatusCode
val contentLength = entity.getContentLength
+ // Negative contentLength means unknown or overflow. We don't want
to consume in either case.
if (contentLength >= 0) {
- val bytesToRead = Math.min(contentLength, maxResponseBytes)
- val bytes = IOUtils.toByteArray(entity.getContent, bytesToRead)
- val str = new String(bytes, StandardCharsets.UTF_8)
- val truncated = if (contentLength <= maxResponseBytes) None else
Some(contentLength.B, maxResponse)
- Right(ContainerResponse(statusCode, str, truncated))
+ if (contentLength <= maxResponseBytes) {
+ // optimized route to consume the entire stream into a string
+ val str = EntityUtils.toString(entity, StandardCharsets.UTF_8)
// consumes and closes the whole stream
+ Right(ContainerResponse(statusCode, str, None))
+ } else {
+ // only consume a bounded number of bytes according to the
system limits
+ val str = new String(IOUtils.toByteArray(entity.getContent,
maxResponseBytes), StandardCharsets.UTF_8)
+ EntityUtils.consumeQuietly(entity) // consume the rest of the
stream to free the connection
+ Right(ContainerResponse(statusCode, str, Some(contentLength.B,
maxResponse)))
+ }
} else {
+ EntityUtils.consumeQuietly(entity) // silently consume the whole
stream to free the connection
Left(NoResponseReceived())
}
}
@@ -164,7 +168,10 @@ protected class HttpUtils(hostname: String, timeout:
FiniteDuration, maxResponse
private val connection = HttpClientBuilder.create
.setDefaultRequestConfig(httpconfig)
- .setConnectionManager(if (maxConcurrent > 1) {
+ // Connections are not reused by most of the available runtimes. To
circumvent any issues we might have regarding
+ // connections randomly breaking due to our pause/resume cycle, we don't
reuse connections at all.
+ .setConnectionReuseStrategy(new NoConnectionReuseStrategy)
+ .setConnectionManager {
// A PoolingHttpClientConnectionManager is the default when not
specifying any ConnectionManager.
// The PoolingHttpClientConnectionManager has the benefit of actively
checking if a connection has become stale,
// which is very important because pausing/resuming containers can cause
a connection to become silently broken.
@@ -178,7 +185,7 @@ protected class HttpUtils(hostname: String, timeout:
FiniteDuration, maxResponse
cm.setDefaultMaxPerRoute(maxConcurrent)
cm.setMaxTotal(maxConcurrent)
cm
- } else null)
+ }
.useSystemProperties()
.disableAutomaticRetries()
.build
@@ -214,7 +221,7 @@ object HttpUtils {
case Right(r) => (r.statusCode,
Try(r.entity.parseJson.asJsObject).toOption)
case Left(NoResponseReceived()) => throw new IllegalStateException("no
response from container")
case Left(Timeout(_)) => throw new
java.util.concurrent.TimeoutException()
- case Left(ConnectionError(t: java.net.SocketTimeoutException)) =>
+ case Left(ConnectionError(_: java.net.SocketTimeoutException)) =>
throw new java.util.concurrent.TimeoutException()
case Left(ConnectionError(t)) => throw new
IllegalStateException(t.getMessage)
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services