This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 92ac571  Action container log collection does not wait for sentinel on 
developer error (#4582)
92ac571 is described below

commit 92ac571646f7c46eb0b9a2f7d729b0c8d7433cbb
Author: Sven Lange-Last <sven.lange-l...@de.ibm.com>
AuthorDate: Fri Aug 16 11:09:19 2019 +0200

    Action container log collection does not wait for sentinel on developer 
error (#4582)
    
    * Use value definitions instead of literal values
    
    * Action container log collection does not wait for sentinel on developer 
error
    
    * So far, log collection for managed Docker action containers waits for 
sentinels - even if a developer error occurs. Only if an action exceeded its 
init or run timeout, sentinel waiting will be disabled.
    * This change disables sentinel waiting for all developer errors because a 
managed action runtime may not be able to write sentinels at all in developer 
error situations. With this change, missing sentinels are handled in a more 
robust way.
    * Without sentinel waiting, the Docker action container log file is only 
read until file end is reached. No re-reads are performed if the sentinel has 
not been found yet. On busy systems, this may lead to a few missing log lines 
in case of developer errors.
    
    * Add review feedback: reformat block comments
    
    * Fix ActionLimitsTests failure
    
    * Address review feedback
    
    * Improve error handling such that developer errors are no log collecting 
errors. Properly detect log collecting errors independently of developer errors.
    * Add unit test coverage for different error situations.
    
    * Address review feedback: add comments and centralize log collecting config
---
 .../logging/DockerToActivationFileLogStore.scala   | 10 +--
 .../logging/DockerToActivationLogStore.scala       | 97 +++++++++++++++++-----
 .../openwhisk/core/entity/ActivationResult.scala   | 11 +--
 .../org/apache/openwhisk/http/ErrorResponse.scala  |  2 +
 .../containerpool/docker/DockerContainer.scala     | 46 ++++++++--
 .../docker/test/DockerContainerTests.scala         |  2 +-
 .../test/DockerToActivationFileLogStoreTests.scala | 60 +++++++++++--
 .../test/DockerToActivationLogStoreTests.scala     | 83 +++++++++++++++++-
 .../openwhisk/core/limits/ActionLimitsTests.scala  |  2 +-
 9 files changed, 265 insertions(+), 48 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
index 2c9c0a9..1d68710 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
@@ -40,7 +40,6 @@ import org.apache.openwhisk.common.{AkkaLogging, 
TransactionId}
 import org.apache.openwhisk.core.containerpool.Container
 import org.apache.openwhisk.core.entity.{ActivationLogs, 
ExecutableWhiskAction, Identity, WhiskActivation}
 import org.apache.openwhisk.core.entity.size._
-import org.apache.openwhisk.http.Messages
 
 import spray.json._
 import spray.json.DefaultJsonProtocol._
@@ -123,8 +122,9 @@ class DockerToActivationFileLogStore(system: ActorSystem, 
destinationDirectory:
                            container: Container,
                            action: ExecutableWhiskAction): 
Future[ActivationLogs] = {
 
-    val isTimedoutActivation = activation.isTimedoutActivation
-    val logs = logStream(transid, container, action, isTimedoutActivation)
+    val logLimit = action.limits.logs
+    val isDeveloperError = activation.response.isContainerError // container 
error means developer error
+    val logs = logStream(transid, container, logLimit, 
action.exec.sentinelledLogs, isDeveloperError)
 
     // Adding the userId field to every written record, so any background 
process can properly correlate.
     val userIdField = Map("namespaceId" -> user.namespace.uuid.toJson)
@@ -150,10 +150,8 @@ class DockerToActivationFileLogStore(system: ActorSystem, 
destinationDirectory:
     val combined = OwSink.combine(toSeq, toFile)(Broadcast[ByteString](_))
 
     logs.runWith(combined)._1.flatMap { seq =>
-      val possibleErrors = Set(Messages.logFailure, 
Messages.truncateLogs(action.limits.logs.asMegaBytes))
-      val errored = isTimedoutActivation || seq.lastOption.exists(last => 
possibleErrors.exists(last.contains))
       val logs = ActivationLogs(seq.toVector)
-      if (!errored) {
+      if (!isLogCollectingError(logs, logLimit, isDeveloperError)) {
         Future.successful(logs)
       } else {
         Future.failed(LogCollectingException(logs))
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
index 4173a95..064c293 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationLogStore.scala
@@ -18,6 +18,7 @@
 package org.apache.openwhisk.core.containerpool.logging
 
 import java.time.Instant
+
 import akka.NotUsed
 import akka.actor.ActorSystem
 import akka.stream.ActorMaterializer
@@ -25,15 +26,13 @@ import akka.stream.scaladsl.Sink
 import akka.stream.scaladsl.Flow
 import akka.stream.scaladsl.Source
 import akka.util.ByteString
-
 import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.core.containerpool.Container
-import org.apache.openwhisk.core.entity.{ActivationLogs, 
ExecutableWhiskAction, Identity, WhiskActivation}
+import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.http.Messages
 import org.apache.openwhisk.core.database.UserContext
 
 import scala.concurrent.{ExecutionContext, Future}
-
 import spray.json._
 
 /**
@@ -75,8 +74,22 @@ class DockerToActivationLogStore(system: ActorSystem) 
extends LogStore {
   /**
    * Obtains the container's stdout and stderr output.
    *
-   * In case of a timed out activation do not wait for a sentinel to appear 
but instead
-   * collect the log as is and add a message to the log that data might be 
missing
+   * Managed action runtimes are expected to produce sentinels on developer 
errors during
+   * init and run. For certain developer errors like process abortion due to 
unhandled errors
+   * or memory limit exhaustion, the action runtime will likely not be able to 
produce sentinels.
+   *
+   * In addition, there are situations where user actions (un)intentionally 
cause a developer error
+   * in a managed action runtime and prevent the production of sentinels. In 
that case, log file
+   * reading may continue endlessly.
+   *
+   * For these reasons, do not wait for sentinels to appear in log output when 
activations end up
+   * in a developer error. It is expected that sentinels are filtered in 
container.logs() even
+   * if they are not waited for.
+   *
+   * In case of a developer error, append a warning message to the logs that 
data might be missing.
+   *
+   * TODO: instead of just appending a warning message when a developer error 
occurs, we should
+   *       have an out-of-band error handling that injects such messages later 
on.
    *
    * @param transid transaction id
    * @param container container to obtain the log from
@@ -87,37 +100,83 @@ class DockerToActivationLogStore(system: ActorSystem) 
extends LogStore {
    */
   protected def logStream(transid: TransactionId,
                           container: Container,
-                          action: ExecutableWhiskAction,
-                          isTimedoutActivation: Boolean): Source[ByteString, 
Any] = {
-
-    // wait for a sentinel only if no container (developer) error occurred to 
avoid
-    // that log collection continues if the action code still logs after 
timeout
-    val sentinel = action.exec.sentinelledLogs && !isTimedoutActivation
-    val logs = container.logs(action.limits.logs.asMegaBytes, 
sentinel)(transid)
-    val logsWithPossibleError = if (isTimedoutActivation) {
+                          logLimit: LogLimit,
+                          sentinelledLogs: Boolean,
+                          isDeveloperError: Boolean): Source[ByteString, Any] 
= {
+
+    // Wait for a sentinel only if no container (developer) error occurred to 
avoid
+    // that log collection continues if the action code still logs after 
developer error.
+    val waitForSentinel = sentinelledLogs && !isDeveloperError
+    val logs = container.logs(logLimit.asMegaBytes, waitForSentinel)(transid)
+    val logsWithPossibleError = if (isDeveloperError) {
       logs.concat(
-        Source.single(ByteString(LogLine(Instant.now.toString, "stderr", 
Messages.logFailure).toJson.compactPrint)))
+        Source.single(
+          ByteString(LogLine(Instant.now.toString, "stderr", 
Messages.logWarningDeveloperError).toJson.compactPrint)))
     } else logs
     logsWithPossibleError
   }
 
+  /**
+   * Determine whether the passed activation log had a log collecting error or 
not.
+   * It is expected that the log collecting stream appends a message from a 
well known
+   * set of error messages if log collecting failed.
+   *
+   * If the activation failed due to a developer error, an additional error 
message is appended.
+   * In that case, the second last message indicates whether there was a log 
collecting error AND
+   * the last message MUST be the additional error message mentioned above.
+   *
+   * TODO: this function needs to deal with different combinations of error / 
warning messages that
+   *       were appended to / injected into the log collecting stream.
+   *       Instead, we should have an out-of-band error handling that does not 
use log messages to
+   *       detect error conditions but detects errors and appends error / 
warning messages in
+   *       a different way.
+   *
+   * @param actLogs the activation logs to check
+   * @param logLimit the log limit applying to the activation
+   * @param isDeveloperError did activation fail due to developer error?
+   * @return true if log collecting failed, false otherwise
+   */
+  protected def isLogCollectingError(actLogs: ActivationLogs,
+                                     logLimit: LogLimit,
+                                     isDeveloperError: Boolean): Boolean = {
+    val logs = actLogs.logs
+    val logCollectingErrorMessages = Set(Messages.logFailure, 
Messages.truncateLogs(logLimit.asMegaBytes))
+    val lastLine: Option[String] = logs.lastOption
+    val secondLastLine: Option[String] = 
logs.takeRight(2).dropRight(1).lastOption
+
+    if (isDeveloperError) {
+      // Developer error: the second last line indicates whether there was a 
log collecting error.
+      val secondLastLineContainsLogCollectingError =
+        secondLastLine.exists(line => 
logCollectingErrorMessages.exists(line.contains))
+
+      // If a developer error occurred when initializing or running an action,
+      // the last message in logs must be Messages.logWarningDeveloperError.
+      // If not, this is a log collecting error.
+      val lastLineContainsDeveloperError = lastLine.exists(line => 
line.contains(Messages.logWarningDeveloperError))
+
+      secondLastLineContainsLogCollectingError || 
!lastLineContainsDeveloperError
+    } else {
+      // The last line indicates whether there was a log collecting error.
+      lastLine.exists(line => logCollectingErrorMessages.exists(line.contains))
+    }
+  }
+
   override def collectLogs(transid: TransactionId,
                            user: Identity,
                            activation: WhiskActivation,
                            container: Container,
                            action: ExecutableWhiskAction): 
Future[ActivationLogs] = {
 
-    val isTimedoutActivation = activation.isTimedoutActivation
-    val logs = logStream(transid, container, action, isTimedoutActivation)
+    val logLimit = action.limits.logs
+    val isDeveloperError = activation.response.isContainerError // container 
error means developer error
+    val logs = logStream(transid, container, logLimit, 
action.exec.sentinelledLogs, isDeveloperError)
 
     logs
       .via(DockerToActivationLogStore.toFormattedString)
       .runWith(Sink.seq)
       .flatMap { seq =>
-        val possibleErrors = Set(Messages.logFailure, 
Messages.truncateLogs(action.limits.logs.asMegaBytes))
-        val errored = isTimedoutActivation || seq.lastOption.exists(last => 
possibleErrors.exists(last.contains))
         val logs = ActivationLogs(seq.toVector)
-        if (!errored) {
+        if (!isLogCollectingError(logs, logLimit, isDeveloperError)) {
           Future.successful(logs)
         } else {
           Future.failed(LogCollectingException(logs))
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala
index c7764cd..98f241d 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationResult.scala
@@ -55,18 +55,19 @@ protected[core] object ActivationResponse extends 
DefaultJsonProtocol {
   /* The field name that is universally recognized as the marker of an error, 
from the application or otherwise. */
   val ERROR_FIELD: String = "error"
 
+  // These constants need to be synchronized with messageForCode() method below
   val Success = 0 // action ran successfully and produced a result
   val ApplicationError = 1 // action ran but there was an error and it was 
handled
   val DeveloperError = 2 // action ran but failed to handle an error, or 
action did not run and failed to initialize
   val WhiskError = 3 // internal system error
 
   protected[core] def messageForCode(code: Int) = {
-    require(code >= 0 && code <= 3)
+    require(code >= Success && code <= WhiskError)
     code match {
-      case 0 => "success"
-      case 1 => "application error"
-      case 2 => "action developer error"
-      case 3 => "whisk internal error"
+      case Success          => "success"
+      case ApplicationError => "application error"
+      case DeveloperError   => "action developer error"
+      case WhiskError       => "whisk internal error"
     }
   }
 
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala
index b448459..cd944f9 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/http/ErrorResponse.scala
@@ -166,6 +166,8 @@ object Messages {
   }
   val logFailure = "There was an issue while collecting your logs. Data might 
be missing."
 
+  val logWarningDeveloperError = "The action did not initialize or run as 
expected. Log data might be missing."
+
   /** Error for meta api. */
   val propertyNotFound = "Response does not include requested property."
   def invalidMedia(m: MediaType) = s"Response is not valid '${m.value}'."
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
index 731966b..ec75a77 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
@@ -173,7 +173,8 @@ class DockerContainer(protected val id: ContainerId,
   /** The last read-position in the log file */
   private var logFileOffset = new AtomicLong(0)
 
-  protected val waitForLogs: FiniteDuration = 2.seconds
+  protected val logCollectingIdleTimeout: FiniteDuration = 2.seconds
+  protected val logCollectingTimeoutPerMBLogLimit: FiniteDuration = 2.seconds
   protected val waitForOomState: FiniteDuration = 2.seconds
   protected val filePollInterval: FiniteDuration = 5.milliseconds
 
@@ -257,17 +258,46 @@ class DockerContainer(protected val id: ContainerId,
    * previous activations that have to be skipped. For this reason, a starting 
position
    * is kept and updated upon each invocation.
    *
-   * If asked, check for sentinel markers - but exclude the identified markers 
from
-   * the result returned from this method.
+   * There are two possible modes controlled by parameter waitForSentinel:
    *
-   * Only parses and returns as much logs as fit in the passed log limit.
+   * 1. Wait for sentinel:
+   *    Tail container log file until two sentinel markers show up. Complete
+   *    once two sentinel markers have been identified, regardless whether more
+   *    data could be read from container log file.
+   *    A log file reading error is reported if sentinels cannot be found.
+   *    Managed action runtimes use the the sentinels to mark the end of
+   *    an individual activation.
+   *
+   * 2. Do not wait for sentinel:
+   *    Read container log file up to its end. Stop reading once the end
+   *    has been reached. Complete once two sentinel markers have been
+   *    identified, regardless whether more data could be read from
+   *    container log file.
+   *    No log file reading error is reported if sentinels cannot be found.
+   *    Blackbox actions do not necessarily produce marker sentinels properly,
+   *    so this mode is used for all blackbox actions.
+   *    In addition, this mode can / should be used in error situations with
+   *    managed action runtimes where sentinel markers may be missing or
+   *    arrive too late - Example: action exceeds time or memory limit during
+   *    init or run.
+   *
+   * The result returned from this method does never contain any log sentinel 
markers. These are always
+   * filtered - regardless of the specified waitForSentinel mode.
+   *
+   * Only parses and returns as much logs as fit in the passed log limit. 
Stops log collection with an error
+   * if processing takes too long or time gaps between processing individual 
log lines are too long.
    *
    * @param limit the limit to apply to the log size
    * @param waitForSentinel determines if the processor should wait for a 
sentinel to appear
-   *
    * @return a vector of Strings with log lines in our own JSON format
    */
   def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: 
TransactionId): Source[ByteString, Any] = {
+    // Define a time limit for collecting and processing the logs of a single 
activation.
+    // If this time limit is exceeded, log processing is stopped and declared 
unsuccessful.
+    // Calculate the timeout based on the maximum expected log size, i.e. the 
log limit.
+    // Use a lower bound of 5 MB log size to account for base overhead.
+    val logCollectingTimeout = limit.toMB.toInt.max(5) * 
logCollectingTimeoutPerMBLogLimit
+
     docker
       .rawContainerLogs(id, logFileOffset.get(), if (waitForSentinel) 
Some(filePollInterval) else None)
       // This stage only throws 'FramingException' so we cannot decide whether 
we got truncated due to a size
@@ -281,9 +311,11 @@ class DockerContainer(protected val id: ContainerId,
       }
       .via(new 
CompleteAfterOccurrences(_.containsSlice(DockerContainer.byteStringSentinel), 
2, waitForSentinel))
       // As we're reading the logs after the activation has finished the 
invariant is that all loglines are already
-      // written and we mostly await them being flushed by the docker daemon. 
Therefore we can timeout based on the time
+      // written and we mostly await them being flushed by the docker daemon. 
Therefore we can time out based on the time
       // between two loglines appear without relying on the log frequency in 
the action itself.
-      .idleTimeout(waitForLogs)
+      .idleTimeout(logCollectingIdleTimeout)
+      // Apply an overall time limit for this log collecting and processing 
stream.
+      .completionTimeout(logCollectingTimeout)
       .recover {
         case _: StreamLimitReachedException =>
           // While the stream has already ended by failing the limitWeighted 
stage above, we inject a truncation
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
index 5edef3c..32f1987 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -115,7 +115,7 @@ class DockerContainerTests
         retry: Boolean = false)(implicit transid: TransactionId): 
Future[RunResult] = {
         ccRes
       }
-      override protected val waitForLogs = awaitLogs
+      override protected val logCollectingIdleTimeout = awaitLogs
       override protected val filePollInterval = 1.millisecond
     }
   }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala
index db9c4e2..92ded44 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala
@@ -29,8 +29,9 @@ import org.scalatest.junit.JUnitRunner
 import spray.json.DefaultJsonProtocol._
 import spray.json._
 import org.apache.openwhisk.common.TransactionId
-import 
org.apache.openwhisk.core.containerpool.logging.{DockerToActivationFileLogStore,
 LogLine}
+import 
org.apache.openwhisk.core.containerpool.logging.{DockerToActivationFileLogStore,
 LogCollectingException, LogLine}
 import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.http.Messages
 
 /**
  * Includes the tests for the DockerToActivationLogStore since the behavior 
towards the activation storage should
@@ -60,7 +61,7 @@ class DockerToActivationFileLogStoreTests
     JsObject(activation.toJson.fields ++ Map("namespaceId" -> 
user.namespace.uuid.asString.toJson)).compactPrint + "\n"
   }
 
-  behavior of "DockerCouchDbFileLogStore"
+  behavior of "DockerToActivationFileLogStore"
 
   it should "read logs returned by the container,in mem and enrich + write 
them to the provided sink" in {
     val logs = List(LogLine(Instant.now.toString, "stdout", "this is just a 
test"))
@@ -72,16 +73,65 @@ class DockerToActivationFileLogStoreTests
     val container = new TestContainer(testSource)
     val store = new 
TestLogStoreTo(Flow[ByteString].map(_.utf8String).to(Sink.actorRef(testActor.ref,
 ())))
 
-    val collected = store.collectLogs(TransactionId.testing, user, activation, 
container, action)
+    val collected = store.collectLogs(TransactionId.testing, user, 
successfulActivation, container, action)
 
     await(collected) shouldBe 
ActivationLogs(logs.map(_.toFormattedString).toVector)
     logs.foreach { line =>
       testActor.expectMsg(
-        toLoggedEvent(line, user.namespace.uuid, activation.activationId, 
action.fullyQualifiedName(false)))
+        toLoggedEvent(line, user.namespace.uuid, 
successfulActivation.activationId, action.fullyQualifiedName(false)))
     }
 
     // Last message should be the full activation
-    testActor.expectMsg(toLoggedActivation(activation))
+    testActor.expectMsg(toLoggedActivation(successfulActivation))
+  }
+
+  it should "read logs with log collecting error with developer error" in {
+    val logs = List(
+      LogLine(Instant.now.toString, "stdout", "this is a log"),
+      LogLine(Instant.now.toString, "stderr", Messages.logFailure))
+
+    val testActor = TestProbe()
+
+    val container = new TestContainer(Source(toByteString(logs)))
+    val store = new 
TestLogStoreTo(Flow[ByteString].map(_.utf8String).to(Sink.actorRef(testActor.ref,
 ())))
+
+    val ex = the[LogCollectingException] thrownBy await(
+      store.collectLogs(TransactionId.testing, user, developerErrorActivation, 
container, action))
+    val collectedLogs = ex.partialLogs.logs
+
+    withClue("Collected logs should match provided logs:") {
+      collectedLogs.dropRight(1) shouldBe 
logs.map(_.toFormattedString).toVector
+    }
+
+    withClue("Last line should end with developer error warning:") {
+      val lastLogLine = collectedLogs.last
+      lastLogLine should endWith(Messages.logWarningDeveloperError)
+    }
+
+    withClue("Provided logs should be received by log store:") {
+      logs.foreach { line =>
+        testActor.expectMsg(
+          toLoggedEvent(
+            line,
+            user.namespace.uuid,
+            developerErrorActivation.activationId,
+            action.fullyQualifiedName(false)))
+      }
+    }
+
+    withClue("Last line received by log store should contain developer error 
warning:") {
+      testActor.expectMsgPF() {
+        case s: String =>
+          val ll = s.parseJson.convertTo[LogLine]
+          ll.log shouldBe Messages.logWarningDeveloperError
+        case _ => fail()
+      }
+    }
+
+    withClue("Last message received by log store should be the activation 
record:") {
+      testActor
+        .expectMsg(toLoggedActivation(developerErrorActivation))
+    }
   }
 
   class TestLogStoreTo(override val writeToFile: Sink[ByteString, _])
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
index 6f4caf2..b6118eb 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
@@ -30,12 +30,14 @@ import org.apache.openwhisk.core.containerpool.logging.{
 import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, 
RuntimeManifest}
 import org.apache.openwhisk.core.entity._
 import java.time.Instant
+
 import akka.stream.scaladsl.Source
 import akka.util.ByteString
 import spray.json._
 import org.apache.openwhisk.common.{Logging, TransactionId}
 import org.apache.openwhisk.core.containerpool.{Container, ContainerAddress, 
ContainerId}
 import org.apache.openwhisk.http.Messages
+
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration._
 
@@ -48,7 +50,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with 
Matchers with WskAct
     Identity(Subject(), Namespace(EntityName("testSpace"), uuid), 
BasicAuthenticationAuthKey(uuid, Secret()), Set.empty)
   val exec = CodeExecAsString(RuntimeManifest("actionKind", 
ImageName("testImage")), "testCode", None)
   val action = ExecutableWhiskAction(user.namespace.name.toPath, 
EntityName("actionName"), exec)
-  val activation =
+  val successfulActivation =
     WhiskActivation(
       user.namespace.name.toPath,
       action.name,
@@ -56,6 +58,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with 
Matchers with WskAct
       ActivationId.generate(),
       Instant.EPOCH,
       Instant.EPOCH)
+  val developerErrorActivation = successfulActivation.copy(response = 
ActivationResponse.developerError("failed"))
 
   def toByteString(logs: List[LogLine]) = 
logs.map(_.toJson.compactPrint).map(ByteString.apply)
 
@@ -73,10 +76,58 @@ class DockerToActivationLogStoreTests extends FlatSpec with 
Matchers with WskAct
       LogLine(Instant.now.toString, "stdout", "this is a log too"))
     val container = new TestContainer(Source(toByteString(logs)))
 
-    await(store.collectLogs(tid, user, activation, container, action)) 
shouldBe ActivationLogs(
+    await(store.collectLogs(tid, user, successfulActivation, container, 
action)) shouldBe ActivationLogs(
       logs.map(_.toFormattedString).toVector)
   }
 
+  it should "read logs into a sequence and parse them into the specified 
format with developer error" in {
+    val store = createStore()
+
+    val logs = List(
+      LogLine(Instant.now.toString, "stdout", "this is a log"),
+      LogLine(Instant.now.toString, "stdout", "this is a log too"))
+    val container = new TestContainer(Source(toByteString(logs)))
+
+    val collectedLogs = await(store.collectLogs(tid, user, 
developerErrorActivation, container, action)).logs
+
+    withClue("Collected logs should be empty:") {
+      collectedLogs.dropRight(1) shouldBe 
logs.map(_.toFormattedString).toVector
+    }
+
+    withClue("Last line should end with developer error warning:") {
+      val lastLogLine = collectedLogs.last
+      lastLogLine should endWith(Messages.logWarningDeveloperError)
+    }
+  }
+
+  it should "accept an empty log" in {
+    val store = createStore()
+
+    val logs = List.empty[LogLine]
+    val container = new TestContainer(Source(toByteString(logs)))
+
+    await(store.collectLogs(tid, user, successfulActivation, container, 
action)) shouldBe ActivationLogs(
+      Vector.empty[String])
+  }
+
+  it should "accept an empty log with developer error" in {
+    val store = createStore()
+
+    val logs = List.empty[LogLine]
+    val container = new TestContainer(Source(toByteString(logs)))
+
+    val collectedLogs = await(store.collectLogs(tid, user, 
developerErrorActivation, container, action)).logs
+
+    withClue("Collected logs should be empty:") {
+      collectedLogs.dropRight(1) shouldBe Vector.empty[String]
+    }
+
+    withClue("Last line should end with developer error warning:") {
+      val lastLogLine = collectedLogs.last
+      lastLogLine should endWith(Messages.logWarningDeveloperError)
+    }
+  }
+
   it should "report an error if the logs contain an 'official' notice of such" 
in {
     val store = createStore()
 
@@ -85,10 +136,33 @@ class DockerToActivationLogStoreTests extends FlatSpec 
with Matchers with WskAct
       LogLine(Instant.now.toString, "stderr", Messages.logFailure))
     val container = new TestContainer(Source(toByteString(logs)))
 
-    val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, 
user, activation, container, action))
+    val ex = the[LogCollectingException] thrownBy await(
+      store.collectLogs(tid, user, successfulActivation, container, action))
     ex.partialLogs shouldBe 
ActivationLogs(logs.map(_.toFormattedString).toVector)
   }
 
+  it should "report an error if the logs contain an 'official' notice of such 
with developer error" in {
+    val store = createStore()
+
+    val logs = List(
+      LogLine(Instant.now.toString, "stdout", "this is a log"),
+      LogLine(Instant.now.toString, "stderr", Messages.logFailure))
+    val container = new TestContainer(Source(toByteString(logs)))
+
+    val ex = the[LogCollectingException] thrownBy await(
+      store.collectLogs(tid, user, developerErrorActivation, container, 
action))
+    val collectedLogs = ex.partialLogs.logs
+
+    withClue("Collected logs should match provided logs:") {
+      collectedLogs.dropRight(1) shouldBe 
logs.map(_.toFormattedString).toVector
+    }
+
+    withClue("Last line should end with developer error warning:") {
+      val lastLogLine = collectedLogs.last
+      lastLogLine should endWith(Messages.logWarningDeveloperError)
+    }
+  }
+
   it should "report an error if logs have been truncated" in {
     val store = createStore()
 
@@ -97,7 +171,8 @@ class DockerToActivationLogStoreTests extends FlatSpec with 
Matchers with WskAct
       LogLine(Instant.now.toString, "stderr", 
Messages.truncateLogs(action.limits.logs.asMegaBytes)))
     val container = new TestContainer(Source(toByteString(logs)))
 
-    val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, 
user, activation, container, action))
+    val ex = the[LogCollectingException] thrownBy await(
+      store.collectLogs(tid, user, successfulActivation, container, action))
     ex.partialLogs shouldBe 
ActivationLogs(logs.map(_.toFormattedString).toVector)
   }
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala 
b/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala
index c317373..e5d7286 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/limits/ActionLimitsTests.scala
@@ -497,7 +497,7 @@ class ActionLimitsTests extends TestHelpers with 
WskTestHelpers with WskActorSys
           result.response.result.get
             .fields("error") shouldBe 
Messages.timedoutActivation(allowedActionDuration, init = false).toJson
           val logs = result.logs.get
-          logs.last should include(Messages.logFailure)
+          logs.last should include(Messages.logWarningDeveloperError)
 
           val parseLogTime = (line: String) => Instant.parse(line.split(' 
').head)
           val startTime = parseLogTime(logs.head)

Reply via email to