This is an automated email from the ASF dual-hosted git repository.
dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 33d70bf Avoid that actions do not stop after action timeout when
logging heavily (#4299)
33d70bf is described below
commit 33d70bfbb368e8c263e9f7a6fccfd03cf6655d58
Author: Steffen Rost <[email protected]>
AuthorDate: Thu Feb 28 00:39:10 2019 +0000
Avoid that actions do not stop after action timeout when logging heavily
(#4299)
* wait for sentinel only in case no timeout occurred
* introduce timeout annotation
* nodejs:6 logging timeout action w/o busy loop
* do conversion from FiniteDuration to Duration
---
.../logging/DockerToActivationFileLogStore.scala | 5 +--
.../logging/DockerToActivationLogStore.scala | 39 ++++++++++++++++++++--
.../openwhisk/core/entity/WhiskActivation.scala | 17 ++++++----
.../core/containerpool/ContainerProxy.scala | 19 +++++++++--
tests/dat/actions/loggingTimeout.js | 36 ++++++++++++++++++++
tests/src/test/scala/common/TimingHelpers.scala | 6 ++--
.../openwhisk/core/limits/ActionLimitsTests.scala | 36 +++++++++++++++++++-
7 files changed, 140 insertions(+), 18 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
index 868b9cf..2c9c0a9 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
@@ -123,7 +123,8 @@ class DockerToActivationFileLogStore(system: ActorSystem,
destinationDirectory:
container: Container,
action: ExecutableWhiskAction):
Future[ActivationLogs] = {
- val logs = container.logs(action.limits.logs.asMegaBytes,
action.exec.sentinelledLogs)(transid)
+ val isTimedoutActivation = activation.isTimedoutActivation
+ val logs = logStream(transid, container, action, isTimedoutActivation)
// Adding the userId field to every written record, so any background
process can properly correlate.
val userIdField = Map("namespaceId" -> user.namespace.uuid.toJson)
@@ -150,7 +151,7 @@ class DockerToActivationFileLogStore(system: ActorSystem,
destinationDirectory:
logs.runWith(combined)._1.flatMap { seq =>
val possibleErrors = Set(Messages.logFailure,
Messages.truncateLogs(action.limits.logs.asMegaBytes))
- val errored = seq.lastOption.exists(last =>
possibleErrors.exists(last.contains))
+ val errored = isTimedoutActivation || seq.lastOption.exists(last =>
possibleErrors.exists(last.contains))
val logs = ActivationLogs(seq.toVector)
if (!errored) {
Future.successful(logs)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
index 7fa1ac6..4173a95 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
@@ -17,11 +17,13 @@
package org.apache.openwhisk.core.containerpool.logging
+import java.time.Instant
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Flow
+import akka.stream.scaladsl.Source
import akka.util.ByteString
import org.apache.openwhisk.common.TransactionId
@@ -70,19 +72,50 @@ class DockerToActivationLogStore(system: ActorSystem)
extends LogStore {
override def fetchLogs(activation: WhiskActivation, context: UserContext):
Future[ActivationLogs] =
Future.successful(activation.logs)
+ /**
+ * Obtains the container's stdout and stderr output.
+ *
+ * In case of a timed out activation do not wait for a sentinel to appear
but instead
+ * collect the log as is and add a message to the log that data might be
missing
+ *
+ * @param transid transaction id
+ * @param container container to obtain the log from
+ * @param action action that defines the log limit
+ * @param isTimedoutActivation is activation timed out
+ *
+ * @return a vector of Strings with log lines in our own JSON format
+ */
+ protected def logStream(transid: TransactionId,
+ container: Container,
+ action: ExecutableWhiskAction,
+ isTimedoutActivation: Boolean): Source[ByteString,
Any] = {
+
+ // wait for a sentinel only if no container (developer) error occurred to
avoid
+ // that log collection continues if the action code still logs after
timeout
+ val sentinel = action.exec.sentinelledLogs && !isTimedoutActivation
+ val logs = container.logs(action.limits.logs.asMegaBytes,
sentinel)(transid)
+ val logsWithPossibleError = if (isTimedoutActivation) {
+ logs.concat(
+ Source.single(ByteString(LogLine(Instant.now.toString, "stderr",
Messages.logFailure).toJson.compactPrint)))
+ } else logs
+ logsWithPossibleError
+ }
+
override def collectLogs(transid: TransactionId,
user: Identity,
activation: WhiskActivation,
container: Container,
action: ExecutableWhiskAction):
Future[ActivationLogs] = {
- container
- .logs(action.limits.logs.asMegaBytes,
action.exec.sentinelledLogs)(transid)
+ val isTimedoutActivation = activation.isTimedoutActivation
+ val logs = logStream(transid, container, action, isTimedoutActivation)
+
+ logs
.via(DockerToActivationLogStore.toFormattedString)
.runWith(Sink.seq)
.flatMap { seq =>
val possibleErrors = Set(Messages.logFailure,
Messages.truncateLogs(action.limits.logs.asMegaBytes))
- val errored = seq.lastOption.exists(last =>
possibleErrors.exists(last.contains))
+ val errored = isTimedoutActivation || seq.lastOption.exists(last =>
possibleErrors.exists(last.contains))
val logs = ActivationLogs(seq.toVector)
if (!errored) {
Future.successful(logs)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
index 4222e51..5664c42 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
@@ -111,19 +111,23 @@ case class WhiskActivation(namespace: EntityPath,
}
}
- def metadata = {
+ def metadata =
copy(response = response.withoutResult, annotations = Parameters(), logs =
ActivationLogs())
.revision[WhiskActivation](rev)
- }
- def withoutResult = {
+
+ def withoutResult =
copy(response = response.withoutResult)
.revision[WhiskActivation](rev)
- }
- def withoutLogsOrResult = {
+
+ def withoutLogsOrResult =
copy(response = response.withoutResult, logs =
ActivationLogs()).revision[WhiskActivation](rev)
- }
+
def withoutLogs = copy(logs =
ActivationLogs()).revision[WhiskActivation](rev)
+
def withLogs(logs: ActivationLogs) = copy(logs =
logs).revision[WhiskActivation](rev)
+
+ def isTimedoutActivation =
annotations.getAs[Boolean](WhiskActivation.timeoutAnnotation).getOrElse(false)
+
}
object WhiskActivation
@@ -140,6 +144,7 @@ object WhiskActivation
val initTimeAnnotation = "initTime"
val waitTimeAnnotation = "waitTime"
val conductorAnnotation = "conductor"
+ val timeoutAnnotation = "timeout"
/** Some field names for compositions */
val actionField = "action"
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 7f45cc2..7419bf3 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -285,7 +285,7 @@ class ContainerProxy(
// also update the feed and active ack; the container cleanup is
queued
// implicitly via a FailureMessage which will be processed later
when the state
// transitions to Running
- val activation = ContainerProxy.constructWhiskActivation(job,
None, Interval.zero, response)
+ val activation = ContainerProxy.constructWhiskActivation(job,
None, Interval.zero, false, response)
sendActiveAck(
transid,
activation,
@@ -581,12 +581,22 @@ class ContainerProxy(
val initRunInterval = initInterval
.map(i =>
Interval(runInterval.start.minusMillis(i.duration.toMillis), runInterval.end))
.getOrElse(runInterval)
- ContainerProxy.constructWhiskActivation(job, initInterval,
initRunInterval, response)
+ ContainerProxy.constructWhiskActivation(
+ job,
+ initInterval,
+ initRunInterval,
+ runInterval.duration >= actionTimeout,
+ response)
}
}
.recover {
case InitializationError(interval, response) =>
- ContainerProxy.constructWhiskActivation(job, Some(interval),
interval, response)
+ ContainerProxy.constructWhiskActivation(
+ job,
+ Some(interval),
+ interval,
+ interval.duration >= actionTimeout,
+ response)
case t =>
// Actually, this should never happen - but we want to make sure to
not miss a problem
logging.error(this, s"caught unexpected error while running
activation: ${t}")
@@ -594,6 +604,7 @@ class ContainerProxy(
job,
None,
Interval.zero,
+ false,
ActivationResponse.whiskError(Messages.abnormalRun))
}
@@ -708,6 +719,7 @@ object ContainerProxy {
def constructWhiskActivation(job: Run,
initInterval: Option[Interval],
totalInterval: Interval,
+ isTimeout: Boolean,
response: ActivationResponse) = {
val causedBy = Some {
if (job.msg.causedBySequence) {
@@ -745,6 +757,7 @@ object ContainerProxy {
Parameters(WhiskActivation.limitsAnnotation, job.action.limits.toJson)
++
Parameters(WhiskActivation.pathAnnotation,
JsString(job.action.fullyQualifiedName(false).asString)) ++
Parameters(WhiskActivation.kindAnnotation,
JsString(job.action.exec.kind)) ++
+ Parameters(WhiskActivation.timeoutAnnotation, JsBoolean(isTimeout))
++
causedBy ++ initTime
})
}
diff --git a/tests/dat/actions/loggingTimeout.js
b/tests/dat/actions/loggingTimeout.js
new file mode 100644
index 0000000..a386de0
--- /dev/null
+++ b/tests/dat/actions/loggingTimeout.js
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
contributor
+// license agreements; and to You under the Apache License, Version 2.0.
+
+// This action prints log lines for a defined duration.
+// The output is throttled by a defined delay between two log lines
+// in order to keep the log size small and to stay within the log size limit.
+
+function getArg(value, defaultValue) {
+ return value ? value : defaultValue;
+}
+
+// input: { duration: <duration in millis>, delay: <delay in millis> }, e.g.
+// main( { delay: 100, duration: 10000 } );
+function main(args) {
+
+ durationMillis = getArg(args.duration, 120000);
+ delayMillis = getArg(args.delay, 100);
+
+ logLines = 0;
+ startMillis = new Date();
+
+ timeout = setInterval(function() {
+ console.log(`[${ ++logLines }] The quick brown fox jumps over the lazy
dog.`);
+ }, delayMillis);
+
+ return new Promise(function(resolve, reject) {
+ setTimeout(function() {
+ clearInterval(timeout);
+ message = `hello, I'm back after ${new Date() - startMillis} ms and
printed ${logLines} log lines`
+ console.log(message)
+ resolve({ message: message });
+ }, durationMillis);
+ });
+
+}
+
diff --git a/tests/src/test/scala/common/TimingHelpers.scala
b/tests/src/test/scala/common/TimingHelpers.scala
index d739222..8d3b2b8 100644
--- a/tests/src/test/scala/common/TimingHelpers.scala
+++ b/tests/src/test/scala/common/TimingHelpers.scala
@@ -21,10 +21,10 @@ import java.time.Instant
import scala.concurrent.duration._
trait TimingHelpers {
- def between(start: Instant, end: Instant): Duration =
- Duration.fromNanos(java.time.Duration.between(start, end).toNanos)
+ def between(start: Instant, end: Instant): FiniteDuration =
+ FiniteDuration(java.time.Duration.between(start, end).toNanos, NANOSECONDS)
- def durationOf[A](block: => A): (Duration, A) = {
+ def durationOf[A](block: => A): (FiniteDuration, A) = {
val start = Instant.now
val value = block
val end = Instant.now
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala
index 23523c0..c70a8b1 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala
@@ -22,6 +22,7 @@ import akka.http.scaladsl.model.StatusCodes.BadGateway
import java.io.File
import java.io.PrintWriter
import java.time.Instant
+
import scala.concurrent.duration.{Duration, DurationInt}
import scala.language.postfixOps
import org.junit.runner.RunWith
@@ -30,6 +31,7 @@ import common.ActivationResult
import common.TestHelpers
import common.TestUtils
import common.TestUtils.{BAD_REQUEST, DONTCARE_EXIT, SUCCESS_EXIT}
+import common.TimingHelpers
import common.WhiskProperties
import common.rest.WskRestOperations
import common.WskProps
@@ -51,7 +53,7 @@ import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.http.Messages
@RunWith(classOf[JUnitRunner])
-class ActionLimitsTests extends TestHelpers with WskTestHelpers with
WskActorSystem {
+class ActionLimitsTests extends TestHelpers with WskTestHelpers with
WskActorSystem with TimingHelpers {
implicit val wskprops = WskProps()
val wsk = new WskRestOperations
@@ -463,4 +465,36 @@ class ActionLimitsTests extends TestHelpers with
WskTestHelpers with WskActorSys
_.response.result.get.fields("error") shouldBe
Messages.memoryExhausted.toJson
}
}
+
+ /**
+ * Test that a heavy logging action is interrupted within its timeout limits.
+ */
+ it should "interrupt the heavy logging action within its time limits" in
withAssetCleaner(wskprops) {
+ (wp, assetHelper) =>
+ val name =
s"NodeJsTestLoggingActionCausingTimeout-${System.currentTimeMillis()}"
+ assetHelper.withCleaner(wsk.action, name, confirmDelete = true) {
(action, _) =>
+ action.create(
+ name,
+ Some(TestUtils.getTestActionFilename("loggingTimeout.js")),
+ timeout = Some(allowedActionDuration))
+ }
+ val duration = allowedActionDuration + 3.minutes
+ val checkDuration = allowedActionDuration + 1.minutes
+ val run =
+ wsk.action.invoke(name, Map("durationMillis" ->
duration.toMillis.toJson, "delayMillis" -> 100.toJson))
+ withActivation(wsk.activation, run) { result =>
+ withClue("Activation result not as expected:") {
+ result.response.status shouldBe
ActivationResponse.messageForCode(ActivationResponse.DeveloperError)
+ result.response.result.get
+ .fields("error") shouldBe
Messages.timedoutActivation(allowedActionDuration, init = false).toJson
+ val logs = result.logs.get
+ logs.last should include(Messages.logFailure)
+
+ val parseLogTime = (line: String) => Instant.parse(line.split('
').head)
+ val startTime = parseLogTime(logs.head)
+ val endTime = parseLogTime(logs.last)
+ between(startTime, endTime).toMillis should be <
checkDuration.toMillis
+ }
+ }
+ }
}