rabbah closed pull request #3843: Properly consume and optimize reading of http 
entities, drop connection reuse.
URL: https://github.com/apache/incubator-openwhisk/pull/3843
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 02e94531b9..0983fd899c 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -51,7 +51,7 @@ dependencies {
     compile 'commons-io:commons-io:2.6'
     compile 'commons-collections:commons-collections:3.2.2'
     compile 'org.apache.kafka:kafka-clients:0.11.0.1'
-    compile ('org.apache.httpcomponents:httpclient:4.4.1') {
+    compile ('org.apache.httpcomponents:httpclient:4.5.5') {
         exclude group: 'commons-logging'
     }
     compile ('com.fasterxml.uuid:java-uuid-generator:3.1.3') {
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala 
b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
index bf95441c0f..4ee7363029 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
@@ -20,32 +20,29 @@ package whisk.core.containerpool
 import java.net.NoRouteToHostException
 import java.nio.charset.StandardCharsets
 
-import scala.concurrent.duration._
-import scala.annotation.tailrec
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.util.control.NoStackTrace
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
 import org.apache.commons.io.IOUtils
 import org.apache.http.HttpHeaders
 import org.apache.http.client.config.RequestConfig
-import org.apache.http.client.methods.HttpPost
-import org.apache.http.client.methods.HttpRequestBase
-import org.apache.http.client.utils.URIBuilder
+import org.apache.http.client.methods.{HttpPost, HttpRequestBase}
+import org.apache.http.client.utils.{HttpClientUtils, URIBuilder}
 import org.apache.http.conn.HttpHostConnectException
 import org.apache.http.entity.StringEntity
+import org.apache.http.impl.NoConnectionReuseStrategy
 import org.apache.http.impl.client.HttpClientBuilder
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
+import org.apache.http.util.EntityUtils
 import spray.json._
-import whisk.common.Logging
-import whisk.common.TransactionId
+import whisk.common.{Logging, TransactionId}
 import whisk.core.entity.ActivationResponse._
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size.SizeLong
 
+import scala.annotation.tailrec
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NoStackTrace
+
 /**
  * This HTTP client is used only in the invoker to communicate with the action 
container.
  * It allows to POST a JSON object and receive JSON object back; that is the
@@ -68,7 +65,7 @@ protected class HttpUtils(hostname: String, timeout: 
FiniteDuration, maxResponse
    * This will close the HttpClient that is generated for this instance of 
HttpUtils. That will also cause the
    * ConnectionManager to be closed alongside.
    */
-  def close() = Try(connection.close())
+  def close(): Unit = HttpClientUtils.closeQuietly(connection)
 
   /**
    * Posts to hostname/endpoint the given JSON object.
@@ -106,13 +103,20 @@ protected class HttpUtils(hostname: String, timeout: 
FiniteDuration, maxResponse
           val statusCode = response.getStatusLine.getStatusCode
           val contentLength = entity.getContentLength
 
+          // Negative contentLength means unknown or overflow. We don't want 
to consume in either case.
           if (contentLength >= 0) {
-            val bytesToRead = Math.min(contentLength, maxResponseBytes)
-            val bytes = IOUtils.toByteArray(entity.getContent, bytesToRead)
-            val str = new String(bytes, StandardCharsets.UTF_8)
-            val truncated = if (contentLength <= maxResponseBytes) None else 
Some(contentLength.B, maxResponse)
-            Right(ContainerResponse(statusCode, str, truncated))
+            if (contentLength <= maxResponseBytes) {
+              // optimized route to consume the entire stream into a string
+              val str = EntityUtils.toString(entity, StandardCharsets.UTF_8) 
// consumes and closes the whole stream
+              Right(ContainerResponse(statusCode, str, None))
+            } else {
+              // only consume a bounded number of bytes according to the 
system limits
+              val str = new String(IOUtils.toByteArray(entity.getContent, 
maxResponseBytes), StandardCharsets.UTF_8)
+              EntityUtils.consumeQuietly(entity) // consume the rest of the 
stream to free the connection
+              Right(ContainerResponse(statusCode, str, Some(contentLength.B, 
maxResponse)))
+            }
           } else {
+            EntityUtils.consumeQuietly(entity) // silently consume the whole 
stream to free the connection
             Left(NoResponseReceived())
           }
         }
@@ -164,7 +168,10 @@ protected class HttpUtils(hostname: String, timeout: 
FiniteDuration, maxResponse
 
   private val connection = HttpClientBuilder.create
     .setDefaultRequestConfig(httpconfig)
-    .setConnectionManager(if (maxConcurrent > 1) {
+    // Connections are not reused by most of the available runtimes. To 
circumvent any issues we might have regarding
+    // connections randomly breaking due to our pause/resume cycle, we don't 
reuse connections at all.
+    .setConnectionReuseStrategy(new NoConnectionReuseStrategy)
+    .setConnectionManager {
       // A PoolingHttpClientConnectionManager is the default when not 
specifying any ConnectionManager.
       // The PoolingHttpClientConnectionManager has the benefit of actively 
checking if a connection has become stale,
       // which is very important because pausing/resuming containers can cause 
a connection to become silently broken.
@@ -178,7 +185,7 @@ protected class HttpUtils(hostname: String, timeout: 
FiniteDuration, maxResponse
       cm.setDefaultMaxPerRoute(maxConcurrent)
       cm.setMaxTotal(maxConcurrent)
       cm
-    } else null)
+    }
     .useSystemProperties()
     .disableAutomaticRetries()
     .build
@@ -214,7 +221,7 @@ object HttpUtils {
       case Right(r)                   => (r.statusCode, 
Try(r.entity.parseJson.asJsObject).toOption)
       case Left(NoResponseReceived()) => throw new IllegalStateException("no 
response from container")
       case Left(Timeout(_))           => throw new 
java.util.concurrent.TimeoutException()
-      case Left(ConnectionError(t: java.net.SocketTimeoutException)) =>
+      case Left(ConnectionError(_: java.net.SocketTimeoutException)) =>
         throw new java.util.concurrent.TimeoutException()
       case Left(ConnectionError(t)) => throw new 
IllegalStateException(t.getMessage)
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to