This is an automated email from the ASF dual-hosted git repository.

stanciu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b71cfe  Make activation response truncation length configurable  
(#4909)
3b71cfe is described below

commit 3b71cfe1cd65fae391a164c4ea67fe4c290bb783
Author: Cosmin Stanciu <[email protected]>
AuthorDate: Mon Jun 1 10:31:19 2020 -0700

    Make activation response truncation length configurable  (#4909)
    
    * Make activation payload truncation length configurable
    * Update tests
    * Change configuration to use HOCON
---
 common/scala/src/main/resources/application.conf   |  3 ++-
 .../core/containerpool/AkkaContainerClient.scala   | 11 ++++----
 .../ApacheBlockingContainerClient.scala            |  8 +++---
 .../openwhisk/core/containerpool/Container.scala   |  9 ++++++-
 .../core/entity/ActivationEntityLimit.scala        |  6 ++---
 .../containerpool/docker/DockerContainer.scala     |  9 ++++++-
 .../docker/test/AkkaContainerClientTests.scala     | 29 ++++++++++++++--------
 .../test/ApacheBlockingContainerClientTests.scala  | 18 +++++++-------
 8 files changed, 59 insertions(+), 34 deletions(-)

diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index 994f2cd..6e4a88b 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -351,7 +351,8 @@ whisk {
 
     activation {
         payload {
-            max = 1048576
+            max = 1 m
+            truncation = 1 m
         }
         # Action responses sent through Kafka can contain up to 3018 bytes of 
metadata
         #   CompletionMessage
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
index 3dea6db..4e06d61 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
@@ -70,6 +70,7 @@ protected class AkkaContainerClient(
   port: Int,
   timeout: FiniteDuration,
   maxResponse: ByteSize,
+  truncation: ByteSize,
   queueSize: Int,
   retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, 
as: ActorSystem)
     extends PoolingRestClient("http", hostname, port, queueSize, timeout = 
Some(timeout))
@@ -174,12 +175,12 @@ protected class AkkaContainerClient(
         tail.runWith(Sink.ignore).map(_ => previouslyCaptured.utf8String)
       case (Seq(prefix), tail) =>
         val truncatedResponse = previouslyCaptured ++ prefix
-        if (truncatedResponse.size < maxResponse.toBytes) {
+        if (truncatedResponse.size < truncation.toBytes) {
           truncated(tail, truncatedResponse)
         } else {
           //ignore the tail (MUST CONSUME ENTIRE ENTITY!)
-          //captured string MAY be larger than the max response, so take only 
maxResponse bytes to get the exact length
-          tail.runWith(Sink.ignore).map(_ => 
truncatedResponse.take(maxResponse.toBytes.toInt).utf8String)
+          //captured string MAY be larger than the truncation size, so take 
only truncation bytes to get the exact length
+          tail.runWith(Sink.ignore).map(_ => 
truncatedResponse.take(truncation.toBytes.toInt).utf8String)
         }
     }
   }
@@ -193,7 +194,7 @@ object AkkaContainerClient {
     as: ActorSystem,
     ec: ExecutionContext,
     tid: TransactionId): (Int, Option[JsObject]) = {
-    val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1)
+    val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB, 
1)
     val response = executeRequest(connection, endPoint, content)
     val result = Await.result(response, timeout + 10.seconds) //additional 
timeout to complete futures
     connection.close()
@@ -206,7 +207,7 @@ object AkkaContainerClient {
     tid: TransactionId,
     as: ActorSystem,
     ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
-    val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1)
+    val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB, 
1)
     val futureResults = contents.map { executeRequest(connection, endPoint, _) 
}
     val results = Await.result(Future.sequence(futureResults), timeout + 
10.seconds) //additional timeout to complete futures
     connection.close()
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
index ecfda42..1ba7732 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
@@ -67,6 +67,7 @@ protected[containerpool] case class 
RetryableConnectionError(t: Throwable) exten
 protected class ApacheBlockingContainerClient(hostname: String,
                                               timeout: FiniteDuration,
                                               maxResponse: ByteSize,
+                                              truncation: ByteSize,
                                               maxConcurrent: Int = 1)(implicit 
logging: Logging, ec: ExecutionContext)
     extends ContainerClient {
 
@@ -129,7 +130,7 @@ protected class ApacheBlockingContainerClient(hostname: 
String,
               Right(ContainerResponse(statusCode, str, None))
             } else {
               // only consume a bounded number of bytes according to the 
system limits
-              val str = new String(IOUtils.toByteArray(entity.getContent, 
maxResponseBytes), StandardCharsets.UTF_8)
+              val str = new String(IOUtils.toByteArray(entity.getContent, 
truncationBytes), StandardCharsets.UTF_8)
               EntityUtils.consumeQuietly(entity) // consume the rest of the 
stream to free the connection
               Right(ContainerResponse(statusCode, str, Some(contentLength.B, 
maxResponse)))
             }
@@ -180,6 +181,7 @@ protected class ApacheBlockingContainerClient(hostname: 
String,
   }
 
   private val maxResponseBytes = maxResponse.toBytes
+  private val truncationBytes = truncation.toBytes
 
   private val baseUri = new URIBuilder()
     .setScheme("http")
@@ -227,7 +229,7 @@ object ApacheBlockingContainerClient {
     tid: TransactionId,
     ec: ExecutionContext): (Int, Option[JsObject]) = {
     val timeout = 90.seconds
-    val connection = new ApacheBlockingContainerClient(s"$host:$port", 
timeout, 1.MB)
+    val connection = new ApacheBlockingContainerClient(s"$host:$port", 
timeout, 1.MB, 1.MB)
     val response = executeRequest(connection, endPoint, content)
     val result = Await.result(response, timeout)
     connection.close()
@@ -239,7 +241,7 @@ object ApacheBlockingContainerClient {
     implicit logging: Logging,
     tid: TransactionId,
     ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
-    val connection = new ApacheBlockingContainerClient(s"$host:$port", 
90.seconds, 1.MB, contents.size)
+    val connection = new ApacheBlockingContainerClient(s"$host:$port", 
90.seconds, 1.MB, 1.MB, contents.size)
     val futureResults = contents.map { content =>
       executeRequest(connection, endPoint, content)
     }
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
index 5080dae..6717198 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
@@ -220,12 +220,19 @@ trait Container {
   }
   private def openConnections(timeout: FiniteDuration, maxConcurrent: Int) = {
     if (Container.config.akkaClient) {
-      new AkkaContainerClient(addr.host, addr.port, timeout, 
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024)
+      new AkkaContainerClient(
+        addr.host,
+        addr.port,
+        timeout,
+        ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
+        ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
+        1024)
     } else {
       new ApacheBlockingContainerClient(
         s"${addr.host}:${addr.port}",
         timeout,
         ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
+        ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
         maxConcurrent)
     }
   }
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationEntityLimit.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationEntityLimit.scala
index a98a5c1..45685e3 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationEntityLimit.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationEntityLimit.scala
@@ -20,9 +20,9 @@ package org.apache.openwhisk.core.entity
 import pureconfig._
 import pureconfig.generic.auto._
 import org.apache.openwhisk.core.ConfigKeys
-import org.apache.openwhisk.core.entity.size.SizeLong
+import org.apache.openwhisk.core.entity.size._
 
-case class ActivationEntityPayload(max: ByteSize)
+case class ActivationEntityPayload(max: ByteSize, truncation: ByteSize)
 case class ActivationEntityLimitConf(serdesOverhead: ByteSize, payload: 
ActivationEntityPayload)
 
 /**
@@ -31,9 +31,9 @@ case class ActivationEntityLimitConf(serdesOverhead: 
ByteSize, payload: Activati
  * parameters for triggers.
  */
 protected[core] object ActivationEntityLimit {
-  private implicit val pureconfigLongReader: ConfigReader[ByteSize] = 
ConfigReader[Long].map(_.bytes)
   private val config = 
loadConfigOrThrow[ActivationEntityLimitConf](ConfigKeys.activation)
 
   protected[core] val MAX_ACTIVATION_ENTITY_LIMIT: ByteSize = 
config.payload.max
+  protected[core] val MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT: ByteSize = 
config.payload.truncation
   protected[core] val MAX_ACTIVATION_LIMIT: ByteSize = config.payload.max + 
config.serdesOverhead
 }
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
index a0163b8..ca0df62 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
@@ -220,12 +220,19 @@ class DockerContainer(protected val id: ContainerId,
     val started = Instant.now()
     val http = httpConnection.getOrElse {
       val conn = if (Container.config.akkaClient) {
-        new AkkaContainerClient(addr.host, addr.port, timeout, 
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024)
+        new AkkaContainerClient(
+          addr.host,
+          addr.port,
+          timeout,
+          ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
+          ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
+          1024)
       } else {
         new ApacheBlockingContainerClient(
           s"${addr.host}:${addr.port}",
           timeout,
           ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
+          ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
           maxConcurrent)
       }
       httpConnection = Some(conn)
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
index c2da266..a3dd872 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
@@ -109,7 +109,7 @@ class AkkaContainerClientTests
 
   it should "not wait longer than set timeout" in {
     val timeout = 5.seconds
-    val connection = new AkkaContainerClient(httpHost.getHostName, 
httpHost.getPort, timeout, 1.B, 100)
+    val connection = new AkkaContainerClient(httpHost.getHostName, 
httpHost.getPort, timeout, 1.B, 1.B, 100)
     testHang = timeout * 2
     val start = Instant.now()
     val result = Await.result(connection.post("/init", JsObject.empty, retry = 
true), 10.seconds)
@@ -123,7 +123,7 @@ class AkkaContainerClientTests
 
   it should "handle empty entity response" in {
     val timeout = 5.seconds
-    val connection = new AkkaContainerClient(httpHost.getHostName, 
httpHost.getPort, timeout, 1.B, 100)
+    val connection = new AkkaContainerClient(httpHost.getHostName, 
httpHost.getPort, timeout, 1.B, 1.B, 100)
     testStatusCode = 204
     val result = Await.result(connection.post("/init", JsObject.empty, retry = 
true), 10.seconds)
     result shouldBe Left(NoResponseReceived())
@@ -131,7 +131,7 @@ class AkkaContainerClientTests
 
   it should "retry till timeout on StreamTcpException" in {
     val timeout = 5.seconds
-    val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 
100)
+    val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 
1.B, 100)
     val start = Instant.now()
     val result = Await.result(connection.post("/init", JsObject.empty, retry = 
true), 10.seconds)
     val end = Instant.now()
@@ -146,7 +146,7 @@ class AkkaContainerClientTests
 
   it should "throw ContainerHealthError on HttpHostConnectException if 
reschedule==true" in {
     val timeout = 5.seconds
-    val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 
100)
+    val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 
1.B, 100)
     assertThrows[ContainerHealthError] {
       Await.result(connection.post("/run", JsObject.empty, retry = false, 
reschedule = true), 10.seconds)
     }
@@ -156,7 +156,7 @@ class AkkaContainerClientTests
     val timeout = 5.seconds
     val retryInterval = 500.milliseconds
     val connection =
-      new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 
1.B, 100, retryInterval)
+      new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 
1.B, 1.B, 100, retryInterval)
     val start = Instant.now()
     testConnectionFailCount = 5
     testResponse = ""
@@ -173,7 +173,7 @@ class AkkaContainerClientTests
 
   it should "not truncate responses within limit" in {
     val timeout = 1.minute.toMillis
-    val connection = new AkkaContainerClient(httpHost.getHostName, 
httpHost.getPort, timeout.millis, 50.B, 100)
+    val connection = new AkkaContainerClient(httpHost.getHostName, 
httpHost.getPort, timeout.millis, 50.B, 50.B, 100)
     Seq(true, false).foreach { success =>
       Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
         testStatusCode = if (success) 200 else 500
@@ -188,15 +188,17 @@ class AkkaContainerClientTests
 
   it should "truncate responses that exceed limit" in {
     val timeout = 1.minute.toMillis
-    val limit = 1.B
-    val connection = new AkkaContainerClient(httpHost.getHostName, 
httpHost.getPort, timeout.millis, limit, 100)
+    val limit = 2.B
+    val truncationLimit = 1.B
+    val connection =
+      new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, 
timeout.millis, limit, truncationLimit, 100)
     Seq(true, false).foreach { success =>
       Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
         testStatusCode = if (success) 200 else 500
         testResponse = r
         val result = Await.result(connection.post("/init", JsObject.empty, 
retry = true), 10.seconds)
         result shouldBe Right {
-          ContainerResponse(okStatus = success, r.take(limit.toBytes.toInt), 
Some((r.length.B, limit)))
+          ContainerResponse(okStatus = success, 
r.take(truncationLimit.toBytes.toInt), Some((r.length.B, limit)))
         }
       }
     }
@@ -207,7 +209,9 @@ class AkkaContainerClientTests
     //use a limit large enough to not fit into a single ByteString as response 
entity is parsed into multiple ByteStrings
     //seems like this varies, but often is ~64k or ~128k
     val limit = 300.KB
-    val connection = new AkkaContainerClient(httpHost.getHostName, 
httpHost.getPort, timeout.millis, limit, 100)
+    val truncationLimit = 299.B
+    val connection =
+      new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, 
timeout.millis, limit, truncationLimit, 100)
     Seq(true, false).foreach { success =>
       // Generate a response that's 1MB
       val response = "0" * 1024 * 1024
@@ -215,7 +219,10 @@ class AkkaContainerClientTests
       testResponse = response
       val result = Await.result(connection.post("/init", JsObject.empty, retry 
= true), 10.seconds)
       result shouldBe Right {
-        ContainerResponse(okStatus = success, 
response.take(limit.toBytes.toInt), Some((response.length.B, limit)))
+        ContainerResponse(
+          okStatus = success,
+          response.take(truncationLimit.toBytes.toInt),
+          Some((response.length.B, limit)))
       }
 
     }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
index eee65a3..d40b1e9 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
@@ -100,7 +100,7 @@ class ApacheBlockingContainerClientTests
 
   it should "not wait longer than set timeout" in {
     val timeout = 5.seconds
-    val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 
1.B)
+    val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 
1.B, 1.B)
     testHang = timeout * 2
     val start = Instant.now()
     val result = Await.result(connection.post("/init", JsObject.empty, retry = 
true), 10.seconds)
@@ -114,7 +114,7 @@ class ApacheBlockingContainerClientTests
 
   it should "handle empty entity response" in {
     val timeout = 5.seconds
-    val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 
1.B)
+    val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 
1.B, 1.B)
     testStatusCode = 204
     val result = Await.result(connection.post("/init", JsObject.empty, retry = 
true), 10.seconds)
     result shouldBe Left(NoResponseReceived())
@@ -123,7 +123,7 @@ class ApacheBlockingContainerClientTests
   it should "retry till timeout on HttpHostConnectException" in {
     val timeout = 5.seconds
     val badHostAndPort = "0.0.0.0:12345"
-    val connection = new ApacheBlockingContainerClient(badHostAndPort, 
timeout, 1.B)
+    val connection = new ApacheBlockingContainerClient(badHostAndPort, 
timeout, 1.B, 1.B)
     testStatusCode = 204
     val start = Instant.now()
     val result = Await.result(connection.post("/init", JsObject.empty, retry = 
true), 10.seconds)
@@ -142,7 +142,7 @@ class ApacheBlockingContainerClientTests
   it should "throw ContainerHealthError on HttpHostConnectException if 
reschedule==true" in {
     val timeout = 5.seconds
     val badHostAndPort = "0.0.0.0:12345"
-    val connection = new ApacheBlockingContainerClient(badHostAndPort, 
timeout, 1.B)
+    val connection = new ApacheBlockingContainerClient(badHostAndPort, 
timeout, 1.B, 1.B)
     assertThrows[ContainerHealthError] {
       Await.result(connection.post("/run", JsObject.empty, retry = false, 
reschedule = true), 10.seconds)
     }
@@ -150,7 +150,7 @@ class ApacheBlockingContainerClientTests
 
   it should "not truncate responses within limit" in {
     val timeout = 1.minute.toMillis
-    val connection = new ApacheBlockingContainerClient(hostWithPort, 
timeout.millis, 50.B)
+    val connection = new ApacheBlockingContainerClient(hostWithPort, 
timeout.millis, 50.B, 50.B)
     Seq(true, false).foreach { success =>
       Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
         testStatusCode = if (success) 200 else 500
@@ -165,16 +165,16 @@ class ApacheBlockingContainerClientTests
 
   it should "truncate responses that exceed limit" in {
     val timeout = 1.minute.toMillis
-    val limit = 1.B
-    val excess = limit + 1.B
-    val connection = new ApacheBlockingContainerClient(hostWithPort, 
timeout.millis, limit)
+    val limit = 2.B
+    val truncationLimit = 1.B
+    val connection = new ApacheBlockingContainerClient(hostWithPort, 
timeout.millis, limit, truncationLimit)
     Seq(true, false).foreach { success =>
       Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
         testStatusCode = if (success) 200 else 500
         testResponse = r
         val result = Await.result(connection.post("/init", JsObject.empty, 
retry = true), 10.seconds)
         result shouldBe Right {
-          ContainerResponse(okStatus = success, r.take(limit.toBytes.toInt), 
Some((r.length.B, limit)))
+          ContainerResponse(okStatus = success, 
r.take(truncationLimit.toBytes.toInt), Some((r.length.B, limit)))
         }
       }
     }

Reply via email to