This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 92ac571 Action container log collection does not wait for sentinel on developer error (#4582) 92ac571 is described below commit 92ac571646f7c46eb0b9a2f7d729b0c8d7433cbb Author: Sven Lange-Last <sven.lange-l...@de.ibm.com> AuthorDate: Fri Aug 16 11:09:19 2019 +0200 Action container log collection does not wait for sentinel on developer error (#4582) * Use value definitions instead of literal values * Action container log collection does not wait for sentinel on developer error * So far, log collection for managed Docker action containers waits for sentinels - even if a developer error occurs. Only if an action exceeded its init or run timeout, sentinel waiting will be disabled. * This change disables sentinel waiting for all developer errors because a managed action runtime may not be able to write sentinels at all in developer error situations. With this change, missing sentinels are handled in a more robust way. * Without sentinel waiting, the Docker action container log file is only read until file end is reached. No re-reads are performed if the sentinel has not been found yet. On busy systems, this may lead to a few missing log lines in case of developer errors. * Add review feedback: reformat block comments * Fix ActionLimitsTests failure * Address review feedback * Improve error handling such that developer errors are no log collecting errors. Properly detect log collecting errors independently of developer errors. * Add unit test coverage for different error situations. * Address review feedback: add comments and centralize log collecting config --- .../logging/DockerToActivationFileLogStore.scala | 10 +-- .../logging/DockerToActivationLogStore.scala | 97 +++++++++++++++++----- .../openwhisk/core/entity/ActivationResult.scala | 11 +-- .../org/apache/openwhisk/http/ErrorResponse.scala | 2 + .../containerpool/docker/DockerContainer.scala | 46 ++++++++-- .../docker/test/DockerContainerTests.scala | 2 +- .../test/DockerToActivationFileLogStoreTests.scala | 60 +++++++++++-- .../test/DockerToActivationLogStoreTests.scala | 83 +++++++++++++++++- .../openwhisk/core/limits/ActionLimitsTests.scala | 2 +- 9 files changed, 265 insertions(+), 48 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala index 2c9c0a9..1d68710 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala @@ -40,7 +40,6 @@ import org.apache.openwhisk.common.{AkkaLogging, TransactionId} import org.apache.openwhisk.core.containerpool.Container import org.apache.openwhisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation} import org.apache.openwhisk.core.entity.size._ -import org.apache.openwhisk.http.Messages import spray.json._ import spray.json.DefaultJsonProtocol._ @@ -123,8 +122,9 @@ class DockerToActivationFileLogStore(system: ActorSystem, destinationDirectory: container: Container, action: ExecutableWhiskAction): Future[ActivationLogs] = { - val isTimedoutActivation = activation.isTimedoutActivation - val logs = logStream(transid, container, action, isTimedoutActivation) + val logLimit = action.limits.logs + val isDeveloperError = activation.response.isContainerError // container error means developer error + val logs = logStream(transid, container, logLimit, action.exec.sentinelledLogs, isDeveloperError) // Adding the userId field to every written record, so any background process can properly correlate. val userIdField = Map("namespaceId" -> user.namespace.uuid.toJson) @@ -150,10 +150,8 @@ class DockerToActivationFileLogStore(system: ActorSystem, destinationDirectory: val combined = OwSink.combine(toSeq, toFile)(Broadcast[ByteString](_)) logs.runWith(combined)._1.flatMap { seq => - val possibleErrors = Set(Messages.logFailure, Messages.truncateLogs(action.limits.logs.asMegaBytes)) - val errored = isTimedoutActivation || seq.lastOption.exists(last => possibleErrors.exists(last.contains)) val logs = ActivationLogs(seq.toVector) - if (!errored) { + if (!isLogCollectingError(logs, logLimit, isDeveloperError)) { Future.successful(logs) } else { Future.failed(LogCollectingException(logs)) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala index 4173a95..064c293 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala @@ -18,6 +18,7 @@ package org.apache.openwhisk.core.containerpool.logging import java.time.Instant + import akka.NotUsed import akka.actor.ActorSystem import akka.stream.ActorMaterializer @@ -25,15 +26,13 @@ import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Source import akka.util.ByteString - import org.apache.openwhisk.common.TransactionId import org.apache.openwhisk.core.containerpool.Container -import org.apache.openwhisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation} +import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.http.Messages import org.apache.openwhisk.core.database.UserContext import scala.concurrent.{ExecutionContext, Future} - import spray.json._ /** @@ -75,8 +74,22 @@ class DockerToActivationLogStore(system: ActorSystem) extends LogStore { /** * Obtains the container's stdout and stderr output. * - * In case of a timed out activation do not wait for a sentinel to appear but instead - * collect the log as is and add a message to the log that data might be missing + * Managed action runtimes are expected to produce sentinels on developer errors during + * init and run. For certain developer errors like process abortion due to unhandled errors + * or memory limit exhaustion, the action runtime will likely not be able to produce sentinels. + * + * In addition, there are situations where user actions (un)intentionally cause a developer error + * in a managed action runtime and prevent the production of sentinels. In that case, log file + * reading may continue endlessly. + * + * For these reasons, do not wait for sentinels to appear in log output when activations end up + * in a developer error. It is expected that sentinels are filtered in container.logs() even + * if they are not waited for. + * + * In case of a developer error, append a warning message to the logs that data might be missing. + * + * TODO: instead of just appending a warning message when a developer error occurs, we should + * have an out-of-band error handling that injects such messages later on. * * @param transid transaction id * @param container container to obtain the log from @@ -87,37 +100,83 @@ class DockerToActivationLogStore(system: ActorSystem) extends LogStore { */ protected def logStream(transid: TransactionId, container: Container, - action: ExecutableWhiskAction, - isTimedoutActivation: Boolean): Source[ByteString, Any] = { - - // wait for a sentinel only if no container (developer) error occurred to avoid - // that log collection continues if the action code still logs after timeout - val sentinel = action.exec.sentinelledLogs && !isTimedoutActivation - val logs = container.logs(action.limits.logs.asMegaBytes, sentinel)(transid) - val logsWithPossibleError = if (isTimedoutActivation) { + logLimit: LogLimit, + sentinelledLogs: Boolean, + isDeveloperError: Boolean): Source[ByteString, Any] = { + + // Wait for a sentinel only if no container (developer) error occurred to avoid + // that log collection continues if the action code still logs after developer error. + val waitForSentinel = sentinelledLogs && !isDeveloperError + val logs = container.logs(logLimit.asMegaBytes, waitForSentinel)(transid) + val logsWithPossibleError = if (isDeveloperError) { logs.concat( - Source.single(ByteString(LogLine(Instant.now.toString, "stderr", Messages.logFailure).toJson.compactPrint))) + Source.single( + ByteString(LogLine(Instant.now.toString, "stderr", Messages.logWarningDeveloperError).toJson.compactPrint))) } else logs logsWithPossibleError } + /** + * Determine whether the passed activation log had a log collecting error or not. + * It is expected that the log collecting stream appends a message from a well known + * set of error messages if log collecting failed. + * + * If the activation failed due to a developer error, an additional error message is appended. + * In that case, the second last message indicates whether there was a log collecting error AND + * the last message MUST be the additional error message mentioned above. + * + * TODO: this function needs to deal with different combinations of error / warning messages that + * were appended to / injected into the log collecting stream. + * Instead, we should have an out-of-band error handling that does not use log messages to + * detect error conditions but detects errors and appends error / warning messages in + * a different way. + * + * @param actLogs the activation logs to check + * @param logLimit the log limit applying to the activation + * @param isDeveloperError did activation fail due to developer error? + * @return true if log collecting failed, false otherwise + */ + protected def isLogCollectingError(actLogs: ActivationLogs, + logLimit: LogLimit, + isDeveloperError: Boolean): Boolean = { + val logs = actLogs.logs + val logCollectingErrorMessages = Set(Messages.logFailure, Messages.truncateLogs(logLimit.asMegaBytes)) + val lastLine: Option[String] = logs.lastOption + val secondLastLine: Option[String] = logs.takeRight(2).dropRight(1).lastOption + + if (isDeveloperError) { + // Developer error: the second last line indicates whether there was a log collecting error. + val secondLastLineContainsLogCollectingError = + secondLastLine.exists(line => logCollectingErrorMessages.exists(line.contains)) + + // If a developer error occurred when initializing or running an action, + // the last message in logs must be Messages.logWarningDeveloperError. + // If not, this is a log collecting error. + val lastLineContainsDeveloperError = lastLine.exists(line => line.contains(Messages.logWarningDeveloperError)) + + secondLastLineContainsLogCollectingError || !lastLineContainsDeveloperError + } else { + // The last line indicates whether there was a log collecting error. + lastLine.exists(line => logCollectingErrorMessages.exists(line.contains)) + } + } + override def collectLogs(transid: TransactionId, user: Identity, activation: WhiskActivation, container: Container, action: ExecutableWhiskAction): Future[ActivationLogs] = { - val isTimedoutActivation = activation.isTimedoutActivation - val logs = logStream(transid, container, action, isTimedoutActivation) + val logLimit = action.limits.logs + val isDeveloperError = activation.response.isContainerError // container error means developer error + val logs = logStream(transid, container, logLimit, action.exec.sentinelledLogs, isDeveloperError) logs .via(DockerToActivationLogStore.toFormattedString) .runWith(Sink.seq) .flatMap { seq => - val possibleErrors = Set(Messages.logFailure, Messages.truncateLogs(action.limits.logs.asMegaBytes)) - val errored = isTimedoutActivation || seq.lastOption.exists(last => possibleErrors.exists(last.contains)) val logs = ActivationLogs(seq.toVector) - if (!errored) { + if (!isLogCollectingError(logs, logLimit, isDeveloperError)) { Future.successful(logs) } else { Future.failed(LogCollectingException(logs)) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala index c7764cd..98f241d 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala @@ -55,18 +55,19 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol { /* The field name that is universally recognized as the marker of an error, from the application or otherwise. */ val ERROR_FIELD: String = "error" + // These constants need to be synchronized with messageForCode() method below val Success = 0 // action ran successfully and produced a result val ApplicationError = 1 // action ran but there was an error and it was handled val DeveloperError = 2 // action ran but failed to handle an error, or action did not run and failed to initialize val WhiskError = 3 // internal system error protected[core] def messageForCode(code: Int) = { - require(code >= 0 && code <= 3) + require(code >= Success && code <= WhiskError) code match { - case 0 => "success" - case 1 => "application error" - case 2 => "action developer error" - case 3 => "whisk internal error" + case Success => "success" + case ApplicationError => "application error" + case DeveloperError => "action developer error" + case WhiskError => "whisk internal error" } } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala b/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala index b448459..cd944f9 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala @@ -166,6 +166,8 @@ object Messages { } val logFailure = "There was an issue while collecting your logs. Data might be missing." + val logWarningDeveloperError = "The action did not initialize or run as expected. Log data might be missing." + /** Error for meta api. */ val propertyNotFound = "Response does not include requested property." def invalidMedia(m: MediaType) = s"Response is not valid '${m.value}'." diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala index 731966b..ec75a77 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala @@ -173,7 +173,8 @@ class DockerContainer(protected val id: ContainerId, /** The last read-position in the log file */ private var logFileOffset = new AtomicLong(0) - protected val waitForLogs: FiniteDuration = 2.seconds + protected val logCollectingIdleTimeout: FiniteDuration = 2.seconds + protected val logCollectingTimeoutPerMBLogLimit: FiniteDuration = 2.seconds protected val waitForOomState: FiniteDuration = 2.seconds protected val filePollInterval: FiniteDuration = 5.milliseconds @@ -257,17 +258,46 @@ class DockerContainer(protected val id: ContainerId, * previous activations that have to be skipped. For this reason, a starting position * is kept and updated upon each invocation. * - * If asked, check for sentinel markers - but exclude the identified markers from - * the result returned from this method. + * There are two possible modes controlled by parameter waitForSentinel: * - * Only parses and returns as much logs as fit in the passed log limit. + * 1. Wait for sentinel: + * Tail container log file until two sentinel markers show up. Complete + * once two sentinel markers have been identified, regardless whether more + * data could be read from container log file. + * A log file reading error is reported if sentinels cannot be found. + * Managed action runtimes use the the sentinels to mark the end of + * an individual activation. + * + * 2. Do not wait for sentinel: + * Read container log file up to its end. Stop reading once the end + * has been reached. Complete once two sentinel markers have been + * identified, regardless whether more data could be read from + * container log file. + * No log file reading error is reported if sentinels cannot be found. + * Blackbox actions do not necessarily produce marker sentinels properly, + * so this mode is used for all blackbox actions. + * In addition, this mode can / should be used in error situations with + * managed action runtimes where sentinel markers may be missing or + * arrive too late - Example: action exceeds time or memory limit during + * init or run. + * + * The result returned from this method does never contain any log sentinel markers. These are always + * filtered - regardless of the specified waitForSentinel mode. + * + * Only parses and returns as much logs as fit in the passed log limit. Stops log collection with an error + * if processing takes too long or time gaps between processing individual log lines are too long. * * @param limit the limit to apply to the log size * @param waitForSentinel determines if the processor should wait for a sentinel to appear - * * @return a vector of Strings with log lines in our own JSON format */ def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = { + // Define a time limit for collecting and processing the logs of a single activation. + // If this time limit is exceeded, log processing is stopped and declared unsuccessful. + // Calculate the timeout based on the maximum expected log size, i.e. the log limit. + // Use a lower bound of 5 MB log size to account for base overhead. + val logCollectingTimeout = limit.toMB.toInt.max(5) * logCollectingTimeoutPerMBLogLimit + docker .rawContainerLogs(id, logFileOffset.get(), if (waitForSentinel) Some(filePollInterval) else None) // This stage only throws 'FramingException' so we cannot decide whether we got truncated due to a size @@ -281,9 +311,11 @@ class DockerContainer(protected val id: ContainerId, } .via(new CompleteAfterOccurrences(_.containsSlice(DockerContainer.byteStringSentinel), 2, waitForSentinel)) // As we're reading the logs after the activation has finished the invariant is that all loglines are already - // written and we mostly await them being flushed by the docker daemon. Therefore we can timeout based on the time + // written and we mostly await them being flushed by the docker daemon. Therefore we can time out based on the time // between two loglines appear without relying on the log frequency in the action itself. - .idleTimeout(waitForLogs) + .idleTimeout(logCollectingIdleTimeout) + // Apply an overall time limit for this log collecting and processing stream. + .completionTimeout(logCollectingTimeout) .recover { case _: StreamLimitReachedException => // While the stream has already ended by failing the limitWeighted stage above, we inject a truncation diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala index 5edef3c..32f1987 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala @@ -115,7 +115,7 @@ class DockerContainerTests retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = { ccRes } - override protected val waitForLogs = awaitLogs + override protected val logCollectingIdleTimeout = awaitLogs override protected val filePollInterval = 1.millisecond } } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala index db9c4e2..92ded44 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala @@ -29,8 +29,9 @@ import org.scalatest.junit.JUnitRunner import spray.json.DefaultJsonProtocol._ import spray.json._ import org.apache.openwhisk.common.TransactionId -import org.apache.openwhisk.core.containerpool.logging.{DockerToActivationFileLogStore, LogLine} +import org.apache.openwhisk.core.containerpool.logging.{DockerToActivationFileLogStore, LogCollectingException, LogLine} import org.apache.openwhisk.core.entity._ +import org.apache.openwhisk.http.Messages /** * Includes the tests for the DockerToActivationLogStore since the behavior towards the activation storage should @@ -60,7 +61,7 @@ class DockerToActivationFileLogStoreTests JsObject(activation.toJson.fields ++ Map("namespaceId" -> user.namespace.uuid.asString.toJson)).compactPrint + "\n" } - behavior of "DockerCouchDbFileLogStore" + behavior of "DockerToActivationFileLogStore" it should "read logs returned by the container,in mem and enrich + write them to the provided sink" in { val logs = List(LogLine(Instant.now.toString, "stdout", "this is just a test")) @@ -72,16 +73,65 @@ class DockerToActivationFileLogStoreTests val container = new TestContainer(testSource) val store = new TestLogStoreTo(Flow[ByteString].map(_.utf8String).to(Sink.actorRef(testActor.ref, ()))) - val collected = store.collectLogs(TransactionId.testing, user, activation, container, action) + val collected = store.collectLogs(TransactionId.testing, user, successfulActivation, container, action) await(collected) shouldBe ActivationLogs(logs.map(_.toFormattedString).toVector) logs.foreach { line => testActor.expectMsg( - toLoggedEvent(line, user.namespace.uuid, activation.activationId, action.fullyQualifiedName(false))) + toLoggedEvent(line, user.namespace.uuid, successfulActivation.activationId, action.fullyQualifiedName(false))) } // Last message should be the full activation - testActor.expectMsg(toLoggedActivation(activation)) + testActor.expectMsg(toLoggedActivation(successfulActivation)) + } + + it should "read logs with log collecting error with developer error" in { + val logs = List( + LogLine(Instant.now.toString, "stdout", "this is a log"), + LogLine(Instant.now.toString, "stderr", Messages.logFailure)) + + val testActor = TestProbe() + + val container = new TestContainer(Source(toByteString(logs))) + val store = new TestLogStoreTo(Flow[ByteString].map(_.utf8String).to(Sink.actorRef(testActor.ref, ()))) + + val ex = the[LogCollectingException] thrownBy await( + store.collectLogs(TransactionId.testing, user, developerErrorActivation, container, action)) + val collectedLogs = ex.partialLogs.logs + + withClue("Collected logs should match provided logs:") { + collectedLogs.dropRight(1) shouldBe logs.map(_.toFormattedString).toVector + } + + withClue("Last line should end with developer error warning:") { + val lastLogLine = collectedLogs.last + lastLogLine should endWith(Messages.logWarningDeveloperError) + } + + withClue("Provided logs should be received by log store:") { + logs.foreach { line => + testActor.expectMsg( + toLoggedEvent( + line, + user.namespace.uuid, + developerErrorActivation.activationId, + action.fullyQualifiedName(false))) + } + } + + withClue("Last line received by log store should contain developer error warning:") { + testActor.expectMsgPF() { + case s: String => + val ll = s.parseJson.convertTo[LogLine] + ll.log shouldBe Messages.logWarningDeveloperError + case _ => fail() + } + } + + withClue("Last message received by log store should be the activation record:") { + testActor + .expectMsg(toLoggedActivation(developerErrorActivation)) + } } class TestLogStoreTo(override val writeToFile: Sink[ByteString, _]) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala index 6f4caf2..b6118eb 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala @@ -30,12 +30,14 @@ import org.apache.openwhisk.core.containerpool.logging.{ import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest} import org.apache.openwhisk.core.entity._ import java.time.Instant + import akka.stream.scaladsl.Source import akka.util.ByteString import spray.json._ import org.apache.openwhisk.common.{Logging, TransactionId} import org.apache.openwhisk.core.containerpool.{Container, ContainerAddress, ContainerId} import org.apache.openwhisk.http.Messages + import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ @@ -48,7 +50,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with Matchers with WskAct Identity(Subject(), Namespace(EntityName("testSpace"), uuid), BasicAuthenticationAuthKey(uuid, Secret()), Set.empty) val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None) val action = ExecutableWhiskAction(user.namespace.name.toPath, EntityName("actionName"), exec) - val activation = + val successfulActivation = WhiskActivation( user.namespace.name.toPath, action.name, @@ -56,6 +58,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with Matchers with WskAct ActivationId.generate(), Instant.EPOCH, Instant.EPOCH) + val developerErrorActivation = successfulActivation.copy(response = ActivationResponse.developerError("failed")) def toByteString(logs: List[LogLine]) = logs.map(_.toJson.compactPrint).map(ByteString.apply) @@ -73,10 +76,58 @@ class DockerToActivationLogStoreTests extends FlatSpec with Matchers with WskAct LogLine(Instant.now.toString, "stdout", "this is a log too")) val container = new TestContainer(Source(toByteString(logs))) - await(store.collectLogs(tid, user, activation, container, action)) shouldBe ActivationLogs( + await(store.collectLogs(tid, user, successfulActivation, container, action)) shouldBe ActivationLogs( logs.map(_.toFormattedString).toVector) } + it should "read logs into a sequence and parse them into the specified format with developer error" in { + val store = createStore() + + val logs = List( + LogLine(Instant.now.toString, "stdout", "this is a log"), + LogLine(Instant.now.toString, "stdout", "this is a log too")) + val container = new TestContainer(Source(toByteString(logs))) + + val collectedLogs = await(store.collectLogs(tid, user, developerErrorActivation, container, action)).logs + + withClue("Collected logs should be empty:") { + collectedLogs.dropRight(1) shouldBe logs.map(_.toFormattedString).toVector + } + + withClue("Last line should end with developer error warning:") { + val lastLogLine = collectedLogs.last + lastLogLine should endWith(Messages.logWarningDeveloperError) + } + } + + it should "accept an empty log" in { + val store = createStore() + + val logs = List.empty[LogLine] + val container = new TestContainer(Source(toByteString(logs))) + + await(store.collectLogs(tid, user, successfulActivation, container, action)) shouldBe ActivationLogs( + Vector.empty[String]) + } + + it should "accept an empty log with developer error" in { + val store = createStore() + + val logs = List.empty[LogLine] + val container = new TestContainer(Source(toByteString(logs))) + + val collectedLogs = await(store.collectLogs(tid, user, developerErrorActivation, container, action)).logs + + withClue("Collected logs should be empty:") { + collectedLogs.dropRight(1) shouldBe Vector.empty[String] + } + + withClue("Last line should end with developer error warning:") { + val lastLogLine = collectedLogs.last + lastLogLine should endWith(Messages.logWarningDeveloperError) + } + } + it should "report an error if the logs contain an 'official' notice of such" in { val store = createStore() @@ -85,10 +136,33 @@ class DockerToActivationLogStoreTests extends FlatSpec with Matchers with WskAct LogLine(Instant.now.toString, "stderr", Messages.logFailure)) val container = new TestContainer(Source(toByteString(logs))) - val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, user, activation, container, action)) + val ex = the[LogCollectingException] thrownBy await( + store.collectLogs(tid, user, successfulActivation, container, action)) ex.partialLogs shouldBe ActivationLogs(logs.map(_.toFormattedString).toVector) } + it should "report an error if the logs contain an 'official' notice of such with developer error" in { + val store = createStore() + + val logs = List( + LogLine(Instant.now.toString, "stdout", "this is a log"), + LogLine(Instant.now.toString, "stderr", Messages.logFailure)) + val container = new TestContainer(Source(toByteString(logs))) + + val ex = the[LogCollectingException] thrownBy await( + store.collectLogs(tid, user, developerErrorActivation, container, action)) + val collectedLogs = ex.partialLogs.logs + + withClue("Collected logs should match provided logs:") { + collectedLogs.dropRight(1) shouldBe logs.map(_.toFormattedString).toVector + } + + withClue("Last line should end with developer error warning:") { + val lastLogLine = collectedLogs.last + lastLogLine should endWith(Messages.logWarningDeveloperError) + } + } + it should "report an error if logs have been truncated" in { val store = createStore() @@ -97,7 +171,8 @@ class DockerToActivationLogStoreTests extends FlatSpec with Matchers with WskAct LogLine(Instant.now.toString, "stderr", Messages.truncateLogs(action.limits.logs.asMegaBytes))) val container = new TestContainer(Source(toByteString(logs))) - val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, user, activation, container, action)) + val ex = the[LogCollectingException] thrownBy await( + store.collectLogs(tid, user, successfulActivation, container, action)) ex.partialLogs shouldBe ActivationLogs(logs.map(_.toFormattedString).toVector) } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala index c317373..e5d7286 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala @@ -497,7 +497,7 @@ class ActionLimitsTests extends TestHelpers with WskTestHelpers with WskActorSys result.response.result.get .fields("error") shouldBe Messages.timedoutActivation(allowedActionDuration, init = false).toJson val logs = result.logs.get - logs.last should include(Messages.logFailure) + logs.last should include(Messages.logWarningDeveloperError) val parseLogTime = (line: String) => Instant.parse(line.split(' ').head) val startTime = parseLogTime(logs.head)