This is an automated email from the ASF dual-hosted git repository.
stanciu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 3b71cfe Make activation response truncation length configurable
(#4909)
3b71cfe is described below
commit 3b71cfe1cd65fae391a164c4ea67fe4c290bb783
Author: Cosmin Stanciu <[email protected]>
AuthorDate: Mon Jun 1 10:31:19 2020 -0700
Make activation response truncation length configurable (#4909)
* Make activation payload truncation length configurable
* Update tests
* Change configuration to use HOCON
---
common/scala/src/main/resources/application.conf | 3 ++-
.../core/containerpool/AkkaContainerClient.scala | 11 ++++----
.../ApacheBlockingContainerClient.scala | 8 +++---
.../openwhisk/core/containerpool/Container.scala | 9 ++++++-
.../core/entity/ActivationEntityLimit.scala | 6 ++---
.../containerpool/docker/DockerContainer.scala | 9 ++++++-
.../docker/test/AkkaContainerClientTests.scala | 29 ++++++++++++++--------
.../test/ApacheBlockingContainerClientTests.scala | 18 +++++++-------
8 files changed, 59 insertions(+), 34 deletions(-)
diff --git a/common/scala/src/main/resources/application.conf
b/common/scala/src/main/resources/application.conf
index 994f2cd..6e4a88b 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -351,7 +351,8 @@ whisk {
activation {
payload {
- max = 1048576
+ max = 1 m
+ truncation = 1 m
}
# Action responses sent through Kafka can contain up to 3018 bytes of
metadata
# CompletionMessage
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
index 3dea6db..4e06d61 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
@@ -70,6 +70,7 @@ protected class AkkaContainerClient(
port: Int,
timeout: FiniteDuration,
maxResponse: ByteSize,
+ truncation: ByteSize,
queueSize: Int,
retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging,
as: ActorSystem)
extends PoolingRestClient("http", hostname, port, queueSize, timeout =
Some(timeout))
@@ -174,12 +175,12 @@ protected class AkkaContainerClient(
tail.runWith(Sink.ignore).map(_ => previouslyCaptured.utf8String)
case (Seq(prefix), tail) =>
val truncatedResponse = previouslyCaptured ++ prefix
- if (truncatedResponse.size < maxResponse.toBytes) {
+ if (truncatedResponse.size < truncation.toBytes) {
truncated(tail, truncatedResponse)
} else {
//ignore the tail (MUST CONSUME ENTIRE ENTITY!)
- //captured string MAY be larger than the max response, so take only
maxResponse bytes to get the exact length
- tail.runWith(Sink.ignore).map(_ =>
truncatedResponse.take(maxResponse.toBytes.toInt).utf8String)
+ //captured string MAY be larger than the truncation size, so take
only truncation bytes to get the exact length
+ tail.runWith(Sink.ignore).map(_ =>
truncatedResponse.take(truncation.toBytes.toInt).utf8String)
}
}
}
@@ -193,7 +194,7 @@ object AkkaContainerClient {
as: ActorSystem,
ec: ExecutionContext,
tid: TransactionId): (Int, Option[JsObject]) = {
- val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1)
+ val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB,
1)
val response = executeRequest(connection, endPoint, content)
val result = Await.result(response, timeout + 10.seconds) //additional
timeout to complete futures
connection.close()
@@ -206,7 +207,7 @@ object AkkaContainerClient {
tid: TransactionId,
as: ActorSystem,
ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
- val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1)
+ val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB,
1)
val futureResults = contents.map { executeRequest(connection, endPoint, _)
}
val results = Await.result(Future.sequence(futureResults), timeout +
10.seconds) //additional timeout to complete futures
connection.close()
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
index ecfda42..1ba7732 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala
@@ -67,6 +67,7 @@ protected[containerpool] case class
RetryableConnectionError(t: Throwable) exten
protected class ApacheBlockingContainerClient(hostname: String,
timeout: FiniteDuration,
maxResponse: ByteSize,
+ truncation: ByteSize,
maxConcurrent: Int = 1)(implicit
logging: Logging, ec: ExecutionContext)
extends ContainerClient {
@@ -129,7 +130,7 @@ protected class ApacheBlockingContainerClient(hostname:
String,
Right(ContainerResponse(statusCode, str, None))
} else {
// only consume a bounded number of bytes according to the
system limits
- val str = new String(IOUtils.toByteArray(entity.getContent,
maxResponseBytes), StandardCharsets.UTF_8)
+ val str = new String(IOUtils.toByteArray(entity.getContent,
truncationBytes), StandardCharsets.UTF_8)
EntityUtils.consumeQuietly(entity) // consume the rest of the
stream to free the connection
Right(ContainerResponse(statusCode, str, Some(contentLength.B,
maxResponse)))
}
@@ -180,6 +181,7 @@ protected class ApacheBlockingContainerClient(hostname:
String,
}
private val maxResponseBytes = maxResponse.toBytes
+ private val truncationBytes = truncation.toBytes
private val baseUri = new URIBuilder()
.setScheme("http")
@@ -227,7 +229,7 @@ object ApacheBlockingContainerClient {
tid: TransactionId,
ec: ExecutionContext): (Int, Option[JsObject]) = {
val timeout = 90.seconds
- val connection = new ApacheBlockingContainerClient(s"$host:$port",
timeout, 1.MB)
+ val connection = new ApacheBlockingContainerClient(s"$host:$port",
timeout, 1.MB, 1.MB)
val response = executeRequest(connection, endPoint, content)
val result = Await.result(response, timeout)
connection.close()
@@ -239,7 +241,7 @@ object ApacheBlockingContainerClient {
implicit logging: Logging,
tid: TransactionId,
ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
- val connection = new ApacheBlockingContainerClient(s"$host:$port",
90.seconds, 1.MB, contents.size)
+ val connection = new ApacheBlockingContainerClient(s"$host:$port",
90.seconds, 1.MB, 1.MB, contents.size)
val futureResults = contents.map { content =>
executeRequest(connection, endPoint, content)
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
index 5080dae..6717198 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
@@ -220,12 +220,19 @@ trait Container {
}
private def openConnections(timeout: FiniteDuration, maxConcurrent: Int) = {
if (Container.config.akkaClient) {
- new AkkaContainerClient(addr.host, addr.port, timeout,
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024)
+ new AkkaContainerClient(
+ addr.host,
+ addr.port,
+ timeout,
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
+ 1024)
} else {
new ApacheBlockingContainerClient(
s"${addr.host}:${addr.port}",
timeout,
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
maxConcurrent)
}
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationEntityLimit.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationEntityLimit.scala
index a98a5c1..45685e3 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationEntityLimit.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationEntityLimit.scala
@@ -20,9 +20,9 @@ package org.apache.openwhisk.core.entity
import pureconfig._
import pureconfig.generic.auto._
import org.apache.openwhisk.core.ConfigKeys
-import org.apache.openwhisk.core.entity.size.SizeLong
+import org.apache.openwhisk.core.entity.size._
-case class ActivationEntityPayload(max: ByteSize)
+case class ActivationEntityPayload(max: ByteSize, truncation: ByteSize)
case class ActivationEntityLimitConf(serdesOverhead: ByteSize, payload:
ActivationEntityPayload)
/**
@@ -31,9 +31,9 @@ case class ActivationEntityLimitConf(serdesOverhead:
ByteSize, payload: Activati
* parameters for triggers.
*/
protected[core] object ActivationEntityLimit {
- private implicit val pureconfigLongReader: ConfigReader[ByteSize] =
ConfigReader[Long].map(_.bytes)
private val config =
loadConfigOrThrow[ActivationEntityLimitConf](ConfigKeys.activation)
protected[core] val MAX_ACTIVATION_ENTITY_LIMIT: ByteSize =
config.payload.max
+ protected[core] val MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT: ByteSize =
config.payload.truncation
protected[core] val MAX_ACTIVATION_LIMIT: ByteSize = config.payload.max +
config.serdesOverhead
}
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
index a0163b8..ca0df62 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
@@ -220,12 +220,19 @@ class DockerContainer(protected val id: ContainerId,
val started = Instant.now()
val http = httpConnection.getOrElse {
val conn = if (Container.config.akkaClient) {
- new AkkaContainerClient(addr.host, addr.port, timeout,
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024)
+ new AkkaContainerClient(
+ addr.host,
+ addr.port,
+ timeout,
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
+ 1024)
} else {
new ApacheBlockingContainerClient(
s"${addr.host}:${addr.port}",
timeout,
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
maxConcurrent)
}
httpConnection = Some(conn)
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
index c2da266..a3dd872 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala
@@ -109,7 +109,7 @@ class AkkaContainerClientTests
it should "not wait longer than set timeout" in {
val timeout = 5.seconds
- val connection = new AkkaContainerClient(httpHost.getHostName,
httpHost.getPort, timeout, 1.B, 100)
+ val connection = new AkkaContainerClient(httpHost.getHostName,
httpHost.getPort, timeout, 1.B, 1.B, 100)
testHang = timeout * 2
val start = Instant.now()
val result = Await.result(connection.post("/init", JsObject.empty, retry =
true), 10.seconds)
@@ -123,7 +123,7 @@ class AkkaContainerClientTests
it should "handle empty entity response" in {
val timeout = 5.seconds
- val connection = new AkkaContainerClient(httpHost.getHostName,
httpHost.getPort, timeout, 1.B, 100)
+ val connection = new AkkaContainerClient(httpHost.getHostName,
httpHost.getPort, timeout, 1.B, 1.B, 100)
testStatusCode = 204
val result = Await.result(connection.post("/init", JsObject.empty, retry =
true), 10.seconds)
result shouldBe Left(NoResponseReceived())
@@ -131,7 +131,7 @@ class AkkaContainerClientTests
it should "retry till timeout on StreamTcpException" in {
val timeout = 5.seconds
- val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B,
100)
+ val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B,
1.B, 100)
val start = Instant.now()
val result = Await.result(connection.post("/init", JsObject.empty, retry =
true), 10.seconds)
val end = Instant.now()
@@ -146,7 +146,7 @@ class AkkaContainerClientTests
it should "throw ContainerHealthError on HttpHostConnectException if
reschedule==true" in {
val timeout = 5.seconds
- val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B,
100)
+ val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B,
1.B, 100)
assertThrows[ContainerHealthError] {
Await.result(connection.post("/run", JsObject.empty, retry = false,
reschedule = true), 10.seconds)
}
@@ -156,7 +156,7 @@ class AkkaContainerClientTests
val timeout = 5.seconds
val retryInterval = 500.milliseconds
val connection =
- new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout,
1.B, 100, retryInterval)
+ new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout,
1.B, 1.B, 100, retryInterval)
val start = Instant.now()
testConnectionFailCount = 5
testResponse = ""
@@ -173,7 +173,7 @@ class AkkaContainerClientTests
it should "not truncate responses within limit" in {
val timeout = 1.minute.toMillis
- val connection = new AkkaContainerClient(httpHost.getHostName,
httpHost.getPort, timeout.millis, 50.B, 100)
+ val connection = new AkkaContainerClient(httpHost.getHostName,
httpHost.getPort, timeout.millis, 50.B, 50.B, 100)
Seq(true, false).foreach { success =>
Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
testStatusCode = if (success) 200 else 500
@@ -188,15 +188,17 @@ class AkkaContainerClientTests
it should "truncate responses that exceed limit" in {
val timeout = 1.minute.toMillis
- val limit = 1.B
- val connection = new AkkaContainerClient(httpHost.getHostName,
httpHost.getPort, timeout.millis, limit, 100)
+ val limit = 2.B
+ val truncationLimit = 1.B
+ val connection =
+ new AkkaContainerClient(httpHost.getHostName, httpHost.getPort,
timeout.millis, limit, truncationLimit, 100)
Seq(true, false).foreach { success =>
Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
testStatusCode = if (success) 200 else 500
testResponse = r
val result = Await.result(connection.post("/init", JsObject.empty,
retry = true), 10.seconds)
result shouldBe Right {
- ContainerResponse(okStatus = success, r.take(limit.toBytes.toInt),
Some((r.length.B, limit)))
+ ContainerResponse(okStatus = success,
r.take(truncationLimit.toBytes.toInt), Some((r.length.B, limit)))
}
}
}
@@ -207,7 +209,9 @@ class AkkaContainerClientTests
//use a limit large enough to not fit into a single ByteString as response
entity is parsed into multiple ByteStrings
//seems like this varies, but often is ~64k or ~128k
val limit = 300.KB
- val connection = new AkkaContainerClient(httpHost.getHostName,
httpHost.getPort, timeout.millis, limit, 100)
+ val truncationLimit = 299.B
+ val connection =
+ new AkkaContainerClient(httpHost.getHostName, httpHost.getPort,
timeout.millis, limit, truncationLimit, 100)
Seq(true, false).foreach { success =>
// Generate a response that's 1MB
val response = "0" * 1024 * 1024
@@ -215,7 +219,10 @@ class AkkaContainerClientTests
testResponse = response
val result = Await.result(connection.post("/init", JsObject.empty, retry
= true), 10.seconds)
result shouldBe Right {
- ContainerResponse(okStatus = success,
response.take(limit.toBytes.toInt), Some((response.length.B, limit)))
+ ContainerResponse(
+ okStatus = success,
+ response.take(truncationLimit.toBytes.toInt),
+ Some((response.length.B, limit)))
}
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
index eee65a3..d40b1e9 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala
@@ -100,7 +100,7 @@ class ApacheBlockingContainerClientTests
it should "not wait longer than set timeout" in {
val timeout = 5.seconds
- val connection = new ApacheBlockingContainerClient(hostWithPort, timeout,
1.B)
+ val connection = new ApacheBlockingContainerClient(hostWithPort, timeout,
1.B, 1.B)
testHang = timeout * 2
val start = Instant.now()
val result = Await.result(connection.post("/init", JsObject.empty, retry =
true), 10.seconds)
@@ -114,7 +114,7 @@ class ApacheBlockingContainerClientTests
it should "handle empty entity response" in {
val timeout = 5.seconds
- val connection = new ApacheBlockingContainerClient(hostWithPort, timeout,
1.B)
+ val connection = new ApacheBlockingContainerClient(hostWithPort, timeout,
1.B, 1.B)
testStatusCode = 204
val result = Await.result(connection.post("/init", JsObject.empty, retry =
true), 10.seconds)
result shouldBe Left(NoResponseReceived())
@@ -123,7 +123,7 @@ class ApacheBlockingContainerClientTests
it should "retry till timeout on HttpHostConnectException" in {
val timeout = 5.seconds
val badHostAndPort = "0.0.0.0:12345"
- val connection = new ApacheBlockingContainerClient(badHostAndPort,
timeout, 1.B)
+ val connection = new ApacheBlockingContainerClient(badHostAndPort,
timeout, 1.B, 1.B)
testStatusCode = 204
val start = Instant.now()
val result = Await.result(connection.post("/init", JsObject.empty, retry =
true), 10.seconds)
@@ -142,7 +142,7 @@ class ApacheBlockingContainerClientTests
it should "throw ContainerHealthError on HttpHostConnectException if
reschedule==true" in {
val timeout = 5.seconds
val badHostAndPort = "0.0.0.0:12345"
- val connection = new ApacheBlockingContainerClient(badHostAndPort,
timeout, 1.B)
+ val connection = new ApacheBlockingContainerClient(badHostAndPort,
timeout, 1.B, 1.B)
assertThrows[ContainerHealthError] {
Await.result(connection.post("/run", JsObject.empty, retry = false,
reschedule = true), 10.seconds)
}
@@ -150,7 +150,7 @@ class ApacheBlockingContainerClientTests
it should "not truncate responses within limit" in {
val timeout = 1.minute.toMillis
- val connection = new ApacheBlockingContainerClient(hostWithPort,
timeout.millis, 50.B)
+ val connection = new ApacheBlockingContainerClient(hostWithPort,
timeout.millis, 50.B, 50.B)
Seq(true, false).foreach { success =>
Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
testStatusCode = if (success) 200 else 500
@@ -165,16 +165,16 @@ class ApacheBlockingContainerClientTests
it should "truncate responses that exceed limit" in {
val timeout = 1.minute.toMillis
- val limit = 1.B
- val excess = limit + 1.B
- val connection = new ApacheBlockingContainerClient(hostWithPort,
timeout.millis, limit)
+ val limit = 2.B
+ val truncationLimit = 1.B
+ val connection = new ApacheBlockingContainerClient(hostWithPort,
timeout.millis, limit, truncationLimit)
Seq(true, false).foreach { success =>
Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
testStatusCode = if (success) 200 else 500
testResponse = r
val result = Await.result(connection.post("/init", JsObject.empty,
retry = true), 10.seconds)
result shouldBe Right {
- ContainerResponse(okStatus = success, r.take(limit.toBytes.toInt),
Some((r.length.B, limit)))
+ ContainerResponse(okStatus = success,
r.take(truncationLimit.toBytes.toInt), Some((r.length.B, limit)))
}
}
}