This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new e7e8414  Properly consume and optimize reading of http entities, drop 
connection reuse. (#3843)
e7e8414 is described below

commit e7e841476dd30905dc8b4997f3af1ff0adba73d0
Author: Markus Thömmes <[email protected]>
AuthorDate: Fri Jul 6 13:07:18 2018 +0100

    Properly consume and optimize reading of http entities, drop connection 
reuse. (#3843)
    
    - Explicitly consume and close the response's entity in any case (even if 
ignored) to make sure the connections get released properly and are not leaked.
    - Use optimized path for consuming the entire entity into a string if its 
length is within bounds.
    - Don't reuse connections.
    
    To the last point: Reusing connections when a runtime doesn't support it 
adds a significant latency overhead when closing the response's entity. That's 
likely due to some sort of mismatched behavior (client wants to keep the 
connection open, the server doesn't even know the concept). The latency 
overhead in those cases (10-20ms of latency added in latency tests) seem far 
higher than the latency overhead of establishing a new connection (not 
measurable in latency tests).
    Dropping connection reuse also solves any issues that might be encountered 
due to the pause/resume cycles of the container, in which sockets don't react 
to any event at all.
---
 common/scala/build.gradle                          |  2 +-
 .../scala/whisk/core/containerpool/HttpUtils.scala | 53 ++++++++++++----------
 2 files changed, 31 insertions(+), 24 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 02e9453..0983fd8 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -51,7 +51,7 @@ dependencies {
     compile 'commons-io:commons-io:2.6'
     compile 'commons-collections:commons-collections:3.2.2'
     compile 'org.apache.kafka:kafka-clients:0.11.0.1'
-    compile ('org.apache.httpcomponents:httpclient:4.4.1') {
+    compile ('org.apache.httpcomponents:httpclient:4.5.5') {
         exclude group: 'commons-logging'
     }
     compile ('com.fasterxml.uuid:java-uuid-generator:3.1.3') {
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala 
b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
index bf95441..4ee7363 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
@@ -20,32 +20,29 @@ package whisk.core.containerpool
 import java.net.NoRouteToHostException
 import java.nio.charset.StandardCharsets
 
-import scala.concurrent.duration._
-import scala.annotation.tailrec
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.util.control.NoStackTrace
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
 import org.apache.commons.io.IOUtils
 import org.apache.http.HttpHeaders
 import org.apache.http.client.config.RequestConfig
-import org.apache.http.client.methods.HttpPost
-import org.apache.http.client.methods.HttpRequestBase
-import org.apache.http.client.utils.URIBuilder
+import org.apache.http.client.methods.{HttpPost, HttpRequestBase}
+import org.apache.http.client.utils.{HttpClientUtils, URIBuilder}
 import org.apache.http.conn.HttpHostConnectException
 import org.apache.http.entity.StringEntity
+import org.apache.http.impl.NoConnectionReuseStrategy
 import org.apache.http.impl.client.HttpClientBuilder
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
+import org.apache.http.util.EntityUtils
 import spray.json._
-import whisk.common.Logging
-import whisk.common.TransactionId
+import whisk.common.{Logging, TransactionId}
 import whisk.core.entity.ActivationResponse._
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size.SizeLong
 
+import scala.annotation.tailrec
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NoStackTrace
+
 /**
  * This HTTP client is used only in the invoker to communicate with the action 
container.
  * It allows to POST a JSON object and receive JSON object back; that is the
@@ -68,7 +65,7 @@ protected class HttpUtils(hostname: String, timeout: 
FiniteDuration, maxResponse
    * This will close the HttpClient that is generated for this instance of 
HttpUtils. That will also cause the
    * ConnectionManager to be closed alongside.
    */
-  def close() = Try(connection.close())
+  def close(): Unit = HttpClientUtils.closeQuietly(connection)
 
   /**
    * Posts to hostname/endpoint the given JSON object.
@@ -106,13 +103,20 @@ protected class HttpUtils(hostname: String, timeout: 
FiniteDuration, maxResponse
           val statusCode = response.getStatusLine.getStatusCode
           val contentLength = entity.getContentLength
 
+          // Negative contentLength means unknown or overflow. We don't want 
to consume in either case.
           if (contentLength >= 0) {
-            val bytesToRead = Math.min(contentLength, maxResponseBytes)
-            val bytes = IOUtils.toByteArray(entity.getContent, bytesToRead)
-            val str = new String(bytes, StandardCharsets.UTF_8)
-            val truncated = if (contentLength <= maxResponseBytes) None else 
Some(contentLength.B, maxResponse)
-            Right(ContainerResponse(statusCode, str, truncated))
+            if (contentLength <= maxResponseBytes) {
+              // optimized route to consume the entire stream into a string
+              val str = EntityUtils.toString(entity, StandardCharsets.UTF_8) 
// consumes and closes the whole stream
+              Right(ContainerResponse(statusCode, str, None))
+            } else {
+              // only consume a bounded number of bytes according to the 
system limits
+              val str = new String(IOUtils.toByteArray(entity.getContent, 
maxResponseBytes), StandardCharsets.UTF_8)
+              EntityUtils.consumeQuietly(entity) // consume the rest of the 
stream to free the connection
+              Right(ContainerResponse(statusCode, str, Some(contentLength.B, 
maxResponse)))
+            }
           } else {
+            EntityUtils.consumeQuietly(entity) // silently consume the whole 
stream to free the connection
             Left(NoResponseReceived())
           }
         }
@@ -164,7 +168,10 @@ protected class HttpUtils(hostname: String, timeout: 
FiniteDuration, maxResponse
 
   private val connection = HttpClientBuilder.create
     .setDefaultRequestConfig(httpconfig)
-    .setConnectionManager(if (maxConcurrent > 1) {
+    // Connections are not reused by most of the available runtimes. To 
circumvent any issues we might have regarding
+    // connections randomly breaking due to our pause/resume cycle, we don't 
reuse connections at all.
+    .setConnectionReuseStrategy(new NoConnectionReuseStrategy)
+    .setConnectionManager {
       // A PoolingHttpClientConnectionManager is the default when not 
specifying any ConnectionManager.
       // The PoolingHttpClientConnectionManager has the benefit of actively 
checking if a connection has become stale,
       // which is very important because pausing/resuming containers can cause 
a connection to become silently broken.
@@ -178,7 +185,7 @@ protected class HttpUtils(hostname: String, timeout: 
FiniteDuration, maxResponse
       cm.setDefaultMaxPerRoute(maxConcurrent)
       cm.setMaxTotal(maxConcurrent)
       cm
-    } else null)
+    }
     .useSystemProperties()
     .disableAutomaticRetries()
     .build
@@ -214,7 +221,7 @@ object HttpUtils {
       case Right(r)                   => (r.statusCode, 
Try(r.entity.parseJson.asJsObject).toOption)
       case Left(NoResponseReceived()) => throw new IllegalStateException("no 
response from container")
       case Left(Timeout(_))           => throw new 
java.util.concurrent.TimeoutException()
-      case Left(ConnectionError(t: java.net.SocketTimeoutException)) =>
+      case Left(ConnectionError(_: java.net.SocketTimeoutException)) =>
         throw new java.util.concurrent.TimeoutException()
       case Left(ConnectionError(t)) => throw new 
IllegalStateException(t.getMessage)
     }

Reply via email to