This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new d69f8db  Add LogStore which stores to database and file 
simultaneously. (#2974)
d69f8db is described below

commit d69f8db02be70db0da1e07614124ea2e08dd6826
Author: Markus Thömmes <markusthoem...@me.com>
AuthorDate: Wed Dec 6 13:57:22 2017 +0100

    Add LogStore which stores to database and file simultaneously. (#2974)
    
    Adds a new LogStore which reads from the docker file in a streaming 
fashion, enhances the lines read and writes them out to an external file. This 
file can then be picked up by any asynchronous external process to decide what 
to do with those logs outside of the container lifecycle of the invoker.
    
    Notable changes:
    - Make LogStore configurable via ansible.
    - Add activation and user information to the collectLogs interface.
    - Renamed the existing LogStore to be more self-explaining.
---
 ansible/group_vars/all                             |   2 +
 ansible/roles/controller/tasks/deploy.yml          |   2 +
 ansible/roles/invoker/tasks/deploy.yml             |   1 +
 common/scala/src/main/resources/reference.conf     |   4 +-
 .../logging/DockerToActivationFileLogStore.scala   | 156 ++++++++++++++++++
 ...tore.scala => DockerToActivationLogStore.scala} |  19 +--
 .../containerpool/logging/LogRotatorSink.scala     | 176 +++++++++++++++++++++
 .../core/containerpool/logging/LogStore.scala      |  10 +-
 .../whisk/core/containerpool/ContainerProxy.scala  |  32 ++--
 .../scala/whisk/core/invoker/InvokerReactive.scala |   1 +
 .../docker/test/DockerContainerTests.scala         |   4 +-
 .../test/DockerToActivationFileLogStoreTests.scala |  78 +++++++++
 ...scala => DockerToActivationLogStoreTests.scala} |  19 ++-
 .../containerpool/test/ContainerProxyTests.scala   |   9 +-
 14 files changed, 473 insertions(+), 40 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 0e89cc0..7467aee 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -184,6 +184,8 @@ invoker:
   docker:
     become: "{{ invoker_docker_become | default(false) }}"
 
+userLogs:
+  spi: "{{ userLogs_spi | 
default('whisk.core.containerpool.logging.DockerToActivationLogStoreProvider') 
}}"
 
 nginx:
   confdir: "{{ config_root_dir }}/nginx"
diff --git a/ansible/roles/controller/tasks/deploy.yml 
b/ansible/roles/controller/tasks/deploy.yml
index a0cdc72..6caa545 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -94,6 +94,8 @@
 
       "CONFIG_kamon_statsd_hostname": "{{ metrics.kamon.host }}"
       "CONFIG_kamon_statsd_port": "{{ metrics.kamon.port }}"
+
+      "CONFIG_whisk_spi_LogStoreProvider": "{{ userLogs.spi }}"
     volumes:
       - "{{ whisk_logs_dir }}/controller{{ 
groups['controllers'].index(inventory_hostname) }}:/logs"
     ports:
diff --git a/ansible/roles/invoker/tasks/deploy.yml 
b/ansible/roles/invoker/tasks/deploy.yml
index 0aa0d5d..112b37b 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -153,6 +153,7 @@
         -e METRICS_LOG='{{ metrics.log.enabled }}'
         -e CONFIG_kamon_statsd_hostname='{{ metrics.kamon.host }}'
         -e CONFIG_kamon_statsd_port='{{ metrics.kamon.port }}'
+        -e CONFIG_whisk_spi_LogStoreProvider='{{ userLogs.spi }}'
         -v /sys/fs/cgroup:/sys/fs/cgroup
         -v /run/runc:/run/runc
         -v {{ whisk_logs_dir }}/invoker{{ 
groups['invokers'].index(inventory_hostname) }}:/logs
diff --git a/common/scala/src/main/resources/reference.conf 
b/common/scala/src/main/resources/reference.conf
index bf2e694..45543e5 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -2,5 +2,5 @@ whisk.spi{
   ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider
   MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider
   ContainerFactoryProvider = 
whisk.core.containerpool.docker.DockerContainerFactoryProvider
-  LogStoreProvider = whisk.core.containerpool.logging.DockerLogStoreProvider
-}
\ No newline at end of file
+  LogStoreProvider = 
whisk.core.containerpool.logging.DockerToActivationLogStoreProvider
+}
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
new file mode 100644
index 0000000..f31c320
--- /dev/null
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import java.nio.file.{Path, Paths}
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream.alpakka.file.scaladsl.LogRotatorSink
+import akka.stream.{Graph, SinkShape, UniformFanOutShape}
+import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, Sink, 
Source}
+import akka.util.ByteString
+import whisk.common.TransactionId
+import whisk.core.containerpool.Container
+import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, 
WhiskActivation}
+import whisk.core.entity.size._
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import java.time.Instant
+
+import whisk.http.Messages
+
+import scala.concurrent.Future
+
+/**
+ * Docker based implementation of a LogStore.
+ *
+ * Relies on docker's implementation details with regards to the JSON 
log-driver. When using the JSON log-driver
+ * docker writes stdout/stderr to a JSON formatted file which is read by this 
store. Logs are written in the
+ * activation record itself.
+ *
+ * Additionally writes logs to a separate file which can be processed by any 
backend service asynchronously.
+ */
+class DockerToActivationFileLogStore(system: ActorSystem, 
destinationDirectory: Path = Paths.get("logs"))
+    extends DockerToActivationLogStore(system) {
+
+  /**
+   * End of an event as written to a file. Closes the json-object and also 
appends a newline.
+   */
+  private val eventEnd = ByteString("}\n")
+
+  private def fieldsString(fields: Map[String, JsValue]) =
+    fields
+      .map {
+        case (key, value) => s""""$key":${value.compactPrint}"""
+      }
+      .mkString(",")
+
+  /**
+   * Merges all file-writing streams into one globally buffered stream.
+   *
+   * This effectively decouples the time it takes to {@code collectLogs} from 
the time it takes to write the augmented
+   * logging data to a file on the disk.
+   *
+   * All lines are written to a rotating sink, which will create a new file, 
appended with the creation timestamp,
+   * once the defined limit is reached.
+   */
+  val bufferSize = 100.MB
+  protected val writeToFile: Sink[ByteString, _] = MergeHub
+    .source[ByteString]
+    .batchWeighted(bufferSize.toBytes, _.length, identity)(_ ++ _)
+    .to(LogRotatorSink(() => {
+      val maxSize = bufferSize.toBytes
+      var bytesRead = maxSize
+      element =>
+        {
+          val size = element.size
+          if (bytesRead + size > maxSize) {
+            bytesRead = size
+            
Some(destinationDirectory.resolve(s"userlogs-${Instant.now.toEpochMilli}.log"))
+          } else {
+            bytesRead += size
+            None
+          }
+        }
+    }))
+    .run()
+
+  override def collectLogs(transid: TransactionId,
+                           user: Identity,
+                           activation: WhiskActivation,
+                           container: Container,
+                           action: ExecutableWhiskAction): 
Future[ActivationLogs] = {
+
+    val logs = container.logs(action.limits.logs.asMegaBytes, 
action.exec.sentinelledLogs)(transid)
+
+    val additionalMetadata = Map(
+      "activationId" -> activation.activationId.asString.toJson,
+      "action" -> action.fullyQualifiedName(false).asString.toJson,
+      "userId" -> user.authkey.uuid.toJson)
+
+    // Manually construct JSON fields to omit parsing the whole structure
+    val metadata = ByteString("," + fieldsString(additionalMetadata))
+
+    val toSeq = 
Flow[ByteString].via(DockerToActivationLogStore.toFormattedString).toMat(Sink.seq[String])(Keep.right)
+    val toFile = Flow[ByteString]
+    // As each element is a JSON-object, we know we can add the manually 
constructed fields to it by dropping
+    // the closing "}", adding the fields and finally add "}\n" to the end 
again.
+      .map(_.dropRight(1) ++ metadata ++ eventEnd)
+      // As the last element of the stream, print the activation record.
+      .concat(Source.single(ByteString(activation.toJson.compactPrint + "\n")))
+      .to(writeToFile)
+
+    val combined = OwSink.combine(toSeq, toFile)(Broadcast[ByteString](_))
+
+    logs.runWith(combined)._1.flatMap { seq =>
+      val possibleErrors = Set(Messages.logFailure, 
Messages.truncateLogs(action.limits.logs.asMegaBytes))
+      val errored = seq.lastOption.exists(last => 
possibleErrors.exists(last.contains))
+      val logs = ActivationLogs(seq.toVector)
+      if (!errored) {
+        Future.successful(logs)
+      } else {
+        Future.failed(LogCollectingException(logs))
+      }
+    }
+  }
+}
+
+object DockerToActivationFileLogStoreProvider extends LogStoreProvider {
+  override def logStore(actorSystem: ActorSystem): LogStore = new 
DockerToActivationFileLogStore(actorSystem)
+}
+
+object OwSink {
+
+  /**
+   * Combines two sinks into one sink using the given strategy. The 
materialized value is a Tuple2 of the materialized
+   * values of either sink. Code basically copied from {@code Sink.combine}
+   */
+  def combine[T, U, M1, M2](first: Sink[U, M1], second: Sink[U, M2])(
+    strategy: Int ⇒ Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, (M1, 
M2)] = {
+    Sink.fromGraph(GraphDSL.create(first, second)((_, _)) { implicit b => (s1, 
s2) =>
+      import GraphDSL.Implicits._
+      val d = b.add(strategy(2))
+
+      d ~> s1
+      d ~> s2
+
+      SinkShape(d.in)
+    })
+  }
+}
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerLogStore.scala
 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
similarity index 85%
rename from 
common/scala/src/main/scala/whisk/core/containerpool/logging/DockerLogStore.scala
rename to 
common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
index 64434ad..153aa59 100644
--- 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerLogStore.scala
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
@@ -25,7 +25,7 @@ import akka.stream.scaladsl.Flow
 import akka.util.ByteString
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
-import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, 
WhiskActivation}
+import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, 
WhiskActivation}
 import spray.json._
 import whisk.http.Messages
 
@@ -42,7 +42,7 @@ protected[core] object LogLine extends DefaultJsonProtocol {
   implicit val serdes = jsonFormat3(LogLine.apply)
 }
 
-object DockerLogStore {
+object DockerToActivationLogStore {
 
   /** Transforms chunked JsObjects into formatted strings */
   val toFormattedString: Flow[ByteString, String, NotUsed] =
@@ -54,9 +54,9 @@ object DockerLogStore {
  *
  * Relies on docker's implementation details with regards to the JSON 
log-driver. When using the JSON log-driver
  * docker writes stdout/stderr to a JSON formatted file which is read by this 
store. Logs are written in the
- * activation record itself and thus stored in CouchDB.
+ * activation record itself.
  */
-class DockerLogStore(system: ActorSystem) extends LogStore {
+class DockerToActivationLogStore(system: ActorSystem) extends LogStore {
   implicit val ec: ExecutionContext = system.dispatcher
   implicit val mat: ActorMaterializer = ActorMaterializer()(system)
 
@@ -67,16 +67,17 @@ class DockerLogStore(system: ActorSystem) extends LogStore {
   override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] 
= Future.successful(activation.logs)
 
   override def collectLogs(transid: TransactionId,
+                           user: Identity,
+                           activation: WhiskActivation,
                            container: Container,
                            action: ExecutableWhiskAction): 
Future[ActivationLogs] = {
 
-    val possibleErrors = Set(Messages.logFailure, 
Messages.truncateLogs(action.limits.logs.asMegaBytes))
-
     container
       .logs(action.limits.logs.asMegaBytes, 
action.exec.sentinelledLogs)(transid)
-      .via(DockerLogStore.toFormattedString)
+      .via(DockerToActivationLogStore.toFormattedString)
       .runWith(Sink.seq)
       .flatMap { seq =>
+        val possibleErrors = Set(Messages.logFailure, 
Messages.truncateLogs(action.limits.logs.asMegaBytes))
         val errored = seq.lastOption.exists(last => 
possibleErrors.exists(last.contains))
         val logs = ActivationLogs(seq.toVector)
         if (!errored) {
@@ -88,6 +89,6 @@ class DockerLogStore(system: ActorSystem) extends LogStore {
   }
 }
 
-object DockerLogStoreProvider extends LogStoreProvider {
-  override def logStore(actorSystem: ActorSystem): LogStore = new 
DockerLogStore(actorSystem)
+object DockerToActivationLogStoreProvider extends LogStoreProvider {
+  override def logStore(actorSystem: ActorSystem): LogStore = new 
DockerToActivationLogStore(actorSystem)
 }
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogRotatorSink.scala
 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogRotatorSink.scala
new file mode 100644
index 0000000..6c5681b
--- /dev/null
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogRotatorSink.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// TO BE TAKEN OUT AFTER ALPAKKA 0.15 RELEASE
+
+/*
+ * Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
+ */
+
+package akka.stream.alpakka.file.scaladsl
+
+import java.nio.file.{OpenOption, Path, StandardOpenOption}
+
+import akka.Done
+import akka.stream.ActorAttributes.SupervisionStrategy
+import akka.stream._
+import akka.stream.impl.fusing.MapAsync.{Holder, NotYetThere}
+import akka.stream.scaladsl.{FileIO, Sink, Source}
+import akka.stream.stage._
+import akka.util.ByteString
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
+
+object LogRotatorSink {
+  def apply(functionGeneratorFunction: () => ByteString => Option[Path],
+            fileOpenOptions: Set[OpenOption] = Set(StandardOpenOption.APPEND, 
StandardOpenOption.CREATE))
+    : Sink[ByteString, Future[Done]] =
+    Sink.fromGraph(new LogRotatorSink(functionGeneratorFunction, 
fileOpenOptions))
+}
+
+final private[scaladsl] class LogRotatorSink(functionGeneratorFunction: () => 
ByteString => Option[Path],
+                                             fileOpenOptions: Set[OpenOption])
+    extends GraphStageWithMaterializedValue[SinkShape[ByteString], 
Future[Done]] {
+
+  val in = Inlet[ByteString]("FRotator.in")
+  override val shape = SinkShape.of(in)
+
+  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, Future[Done]) = {
+    val promise = Promise[Done]()
+    val logic = new GraphStageLogic(shape) {
+      val pathGeneratorFunction: ByteString => Option[Path] = 
functionGeneratorFunction()
+      var sourceOut: SubSourceOutlet[ByteString] = _
+      var fileSinkCompleted: Seq[Future[IOResult]] = Seq.empty
+      val decider =
+        
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
+
+      def failThisStage(ex: Throwable): Unit =
+        if (!promise.isCompleted) {
+          if (sourceOut != null) {
+            sourceOut.fail(ex)
+          }
+          cancel(in)
+          promise.failure(ex)
+        }
+
+      def generatePathOrFailPeacefully(data: ByteString): Option[Path] = {
+        var ret = Option.empty[Path]
+        try {
+          ret = pathGeneratorFunction(data)
+        } catch {
+          case ex: Throwable =>
+            failThisStage(ex)
+        }
+        ret
+      }
+
+      def fileSinkFutureCallbackHandler(future: Future[IOResult])(h: 
Holder[IOResult]): Unit =
+        h.elem match {
+          case Success(IOResult(_, Failure(ex))) if decider(ex) == 
Supervision.Stop =>
+            promise.failure(ex)
+          case Success(x) if fileSinkCompleted.size == 1 && 
fileSinkCompleted.head == future =>
+            promise.trySuccess(Done)
+            completeStage()
+          case x: Success[IOResult] =>
+            fileSinkCompleted = fileSinkCompleted.filter(_ != future)
+          case Failure(ex) =>
+            failThisStage(ex)
+          case _ =>
+        }
+
+      //init stage where we are waiting for the first path
+      setHandler(
+        in,
+        new InHandler {
+          override def onPush(): Unit = {
+            val data = grab(in)
+            val pathO = generatePathOrFailPeacefully(data)
+            pathO.fold(if (!isClosed(in)) pull(in))(switchPath(_, data))
+          }
+
+          override def onUpstreamFinish(): Unit =
+            completeStage()
+
+          override def onUpstreamFailure(ex: Throwable): Unit =
+            failThisStage(ex)
+        })
+
+      //we must pull the first element cos we are a sink
+      override def preStart(): Unit = {
+        super.preStart()
+        pull(in)
+      }
+
+      def futureCB(newFuture: Future[IOResult]) =
+        
getAsyncCallback[Holder[IOResult]](fileSinkFutureCallbackHandler(newFuture))
+
+      //we recreate the tail of the stream, and emit the data for the next req
+      def switchPath(path: Path, data: ByteString): Unit = {
+        val prevOut = Option(sourceOut)
+
+        sourceOut = new SubSourceOutlet[ByteString]("FRotatorSource")
+        sourceOut.setHandler(new OutHandler {
+          override def onPull(): Unit = {
+            sourceOut.push(data)
+            switchToNormalMode()
+          }
+        })
+        val newFuture = Source
+          .fromGraph(sourceOut.source)
+          .runWith(FileIO.toPath(path, 
fileOpenOptions))(interpreter.subFusingMaterializer)
+
+        fileSinkCompleted = fileSinkCompleted :+ newFuture
+
+        val holder = new Holder[IOResult](NotYetThere, futureCB(newFuture))
+
+        
newFuture.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
+
+        prevOut.foreach(_.complete())
+      }
+
+      //we change path if needed or push the grabbed data
+      def switchToNormalMode(): Unit = {
+        setHandler(
+          in,
+          new InHandler {
+            override def onPush(): Unit = {
+              val data = grab(in)
+              val pathO = generatePathOrFailPeacefully(data)
+              pathO.fold(sourceOut.push(data))(switchPath(_, data))
+            }
+
+            override def onUpstreamFinish(): Unit = {
+              implicit val executionContext: ExecutionContext =
+                akka.dispatch.ExecutionContexts.sameThreadExecutionContext
+              promise.completeWith(Future.sequence(fileSinkCompleted).map(_ => 
Done))
+              sourceOut.complete()
+            }
+
+            override def onUpstreamFailure(ex: Throwable): Unit =
+              failThisStage(ex)
+          })
+        sourceOut.setHandler(new OutHandler {
+          override def onPull(): Unit =
+            pull(in)
+        })
+      }
+    }
+    (logic, promise.future)
+  }
+
+}
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
index 7df2f2c..335eed5 100644
--- 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
@@ -20,7 +20,7 @@ package whisk.core.containerpool.logging
 import akka.actor.ActorSystem
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
-import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, 
WhiskActivation}
+import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, 
WhiskActivation}
 import whisk.spi.Spi
 
 import scala.concurrent.Future
@@ -54,11 +54,17 @@ trait LogStore {
    * record in the database.
    *
    * @param transid transaction the activation ran in
+   * @param user the user who ran the activation
+   * @param activation the activation record
    * @param container container used by the activation
    * @param action action that was activated
    * @return logs for the given activation
    */
-  def collectLogs(transid: TransactionId, container: Container, action: 
ExecutableWhiskAction): Future[ActivationLogs]
+  def collectLogs(transid: TransactionId,
+                  user: Identity,
+                  activation: WhiskActivation,
+                  container: Container,
+                  action: ExecutableWhiskAction): Future[ActivationLogs]
 
   /**
    * Fetch relevant logs for the given activation from the store.
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala 
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index a626a7f..beee654 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -89,13 +89,14 @@ case object ContainerRemoved
  * @param unusedTimeout time after which the container is automatically thrown 
away
  * @param pauseGrace time to wait for new work before pausing the container
  */
-class ContainerProxy(factory: (TransactionId, String, ImageName, Boolean, 
ByteSize) => Future[Container],
-                     sendActiveAck: (TransactionId, WhiskActivation, Boolean, 
InstanceId) => Future[Any],
-                     storeActivation: (TransactionId, WhiskActivation) => 
Future[Any],
-                     collectLogs: (TransactionId, Container, 
ExecutableWhiskAction) => Future[ActivationLogs],
-                     instance: InstanceId,
-                     unusedTimeout: FiniteDuration,
-                     pauseGrace: FiniteDuration)
+class ContainerProxy(
+  factory: (TransactionId, String, ImageName, Boolean, ByteSize) => 
Future[Container],
+  sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId) => 
Future[Any],
+  storeActivation: (TransactionId, WhiskActivation) => Future[Any],
+  collectLogs: (TransactionId, Identity, WhiskActivation, Container, 
ExecutableWhiskAction) => Future[ActivationLogs],
+  instance: InstanceId,
+  unusedTimeout: FiniteDuration,
+  pauseGrace: FiniteDuration)
     extends FSM[ContainerState, ContainerData]
     with Stash {
   implicit val ec = context.system.dispatcher
@@ -367,7 +368,7 @@ class ContainerProxy(factory: (TransactionId, String, 
ImageName, Boolean, ByteSi
     val activationWithLogs: Future[Either[ActivationLogReadingError, 
WhiskActivation]] = activation
       .flatMap { activation =>
         val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS)
-        collectLogs(tid, container, job.action)
+        collectLogs(tid, job.msg.user, activation, container, job.action)
           .andThen {
             case Success(_) => tid.finished(this, start)
             case Failure(t) => tid.failed(this, start, s"reading logs failed: 
$t")
@@ -394,13 +395,14 @@ class ContainerProxy(factory: (TransactionId, String, 
ImageName, Boolean, ByteSi
 }
 
 object ContainerProxy {
-  def props(factory: (TransactionId, String, ImageName, Boolean, ByteSize) => 
Future[Container],
-            ack: (TransactionId, WhiskActivation, Boolean, InstanceId) => 
Future[Any],
-            store: (TransactionId, WhiskActivation) => Future[Any],
-            collectLogs: (TransactionId, Container, ExecutableWhiskAction) => 
Future[ActivationLogs],
-            instance: InstanceId,
-            unusedTimeout: FiniteDuration = 10.minutes,
-            pauseGrace: FiniteDuration = 50.milliseconds) =
+  def props(
+    factory: (TransactionId, String, ImageName, Boolean, ByteSize) => 
Future[Container],
+    ack: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
+    store: (TransactionId, WhiskActivation) => Future[Any],
+    collectLogs: (TransactionId, Identity, WhiskActivation, Container, 
ExecutableWhiskAction) => Future[ActivationLogs],
+    instance: InstanceId,
+    unusedTimeout: FiniteDuration = 10.minutes,
+    pauseGrace: FiniteDuration = 50.milliseconds) =
     Props(new ContainerProxy(factory, ack, store, collectLogs, instance, 
unusedTimeout, pauseGrace))
 
   // Needs to be thread-safe as it's used by multiple proxies concurrently.
diff --git 
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index a835d9e..2053e3f 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -63,6 +63,7 @@ class InvokerReactive(config: WhiskConfig, instance: 
InstanceId, producer: Messa
   implicit val cfg = config
 
   private val logsProvider = 
SpiLoader.get[LogStoreProvider].logStore(actorSystem)
+  logging.info(this, s"LogStoreProvider: ${logsProvider.getClass}")
 
   /**
    * Factory used by the ContainerProxy to physically create a new container.
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 1519d09..5f9898e 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -33,7 +33,7 @@ import org.junit.runner.RunWith
 import org.scalamock.scalatest.MockFactory
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.FlatSpec
-import whisk.core.containerpool.logging.{DockerLogStore, LogLine}
+import whisk.core.containerpool.logging.{DockerToActivationLogStore, LogLine}
 
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.Matchers
@@ -76,7 +76,7 @@ class DockerContainerTests
 
   /** Reads logs into memory and awaits them */
   def awaitLogs(source: Source[ByteString, Any], timeout: FiniteDuration = 
500.milliseconds): Vector[String] =
-    
Await.result(source.via(DockerLogStore.toFormattedString).runWith(Sink.seq[String]),
 timeout).toVector
+    
Await.result(source.via(DockerToActivationLogStore.toFormattedString).runWith(Sink.seq[String]),
 timeout).toVector
 
   val containerId = ContainerId("id")
 
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala
new file mode 100644
index 0000000..1f5f22c
--- /dev/null
+++ 
b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging.test
+
+import java.time.Instant
+
+import akka.stream.scaladsl.{Flow, Sink, Source}
+import akka.testkit.TestProbe
+import akka.util.ByteString
+import common.{StreamLogging, WskActorSystem}
+import org.scalatest.Matchers
+import spray.json._
+import whisk.common.TransactionId
+import whisk.core.containerpool.logging.{DockerToActivationFileLogStore, 
LogLine}
+import whisk.core.entity._
+
+/**
+ * Includes the tests for the DockerToActivationLogStore since the behavior 
towards the activation storage should
+ * remain exactly the same.
+ */
+class DockerToActivationFileLogStoreTests
+    extends DockerToActivationLogStoreTests
+    with Matchers
+    with WskActorSystem
+    with StreamLogging {
+
+  override def createStore() = new TestLogStoreTo(Sink.ignore)
+
+  def toLoggedEvent(line: LogLine, userId: UUID, activationId: ActivationId, 
actionName: FullyQualifiedEntityName) = {
+    val event = line.toJson.compactPrint
+    val concatenated =
+      
s""","activationId":"${activationId.asString}","action":"${actionName.asString}","userId":"${userId.asString}""""
+
+    event.dropRight(1) ++ concatenated ++ "}\n"
+  }
+
+  behavior of "DockerCouchDbFileLogStore"
+
+  it should "read logs returned by the container,in mem and enrich + write 
them to the provided sink" in {
+    val logs = List(LogLine(Instant.now.toString, "stdout", "this is just a 
test"))
+
+    val testSource: Source[ByteString, _] = Source(logs.map(line => 
ByteString(line.toJson.compactPrint)))
+
+    val testActor = TestProbe()
+
+    val container = new TestContainer(testSource)
+    val store = new 
TestLogStoreTo(Flow[ByteString].map(_.utf8String).to(Sink.actorRef(testActor.ref,
 ())))
+
+    val collected = store.collectLogs(TransactionId.testing, user, activation, 
container, action)
+
+    await(collected) shouldBe 
ActivationLogs(logs.map(_.toFormattedString).toVector)
+    logs.foreach { line =>
+      testActor.expectMsg(
+        toLoggedEvent(line, user.authkey.uuid, activation.activationId, 
action.fullyQualifiedName(false)))
+    }
+
+    // Last message should be the full activation
+    testActor.expectMsg(activation.toJson.compactPrint + "\n")
+  }
+
+  class TestLogStoreTo(override val writeToFile: Sink[ByteString, _])
+      extends DockerToActivationFileLogStore(actorSystem)
+}
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerLogStoreTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
similarity index 85%
rename from 
tests/src/test/scala/whisk/core/containerpool/logging/test/DockerLogStoreTests.scala
rename to 
tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
index d6de3cc..170f9cc 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerLogStoreTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
@@ -19,7 +19,7 @@ package whisk.core.containerpool.logging.test
 
 import common.{StreamLogging, WskActorSystem}
 import org.scalatest.{FlatSpec, Matchers}
-import whisk.core.containerpool.logging.{DockerLogStoreProvider, 
LogCollectingException, LogLine}
+import whisk.core.containerpool.logging.{DockerToActivationLogStoreProvider, 
LogCollectingException, LogLine}
 import whisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
 import whisk.core.entity._
 import java.time.Instant
@@ -34,7 +34,7 @@ import whisk.http.Messages
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration._
 
-class DockerLogStoreTests extends FlatSpec with Matchers with WskActorSystem 
with StreamLogging {
+class DockerToActivationLogStoreTests extends FlatSpec with Matchers with 
WskActorSystem with StreamLogging {
   def await[T](future: Future[T]) = Await.result(future, 1.minute)
 
   val user = Identity(Subject(), EntityName("testSpace"), AuthKey(), Set())
@@ -47,40 +47,43 @@ class DockerLogStoreTests extends FlatSpec with Matchers 
with WskActorSystem wit
 
   val tid = TransactionId.testing
 
+  def createStore() = DockerToActivationLogStoreProvider.logStore(actorSystem)
+
   behavior of "DockerLogStore"
 
   it should "read logs into a sequence and parse them into the specified 
format" in {
-    val store = DockerLogStoreProvider.logStore(actorSystem)
+    val store = createStore()
 
     val logs = List(
       LogLine(Instant.now.toString, "stdout", "this is a log"),
       LogLine(Instant.now.toString, "stdout", "this is a log too"))
     val container = new TestContainer(Source(toByteString(logs)))
 
-    await(store.collectLogs(tid, container, action)) shouldBe 
ActivationLogs(logs.map(_.toFormattedString).toVector)
+    await(store.collectLogs(tid, user, activation, container, action)) 
shouldBe ActivationLogs(
+      logs.map(_.toFormattedString).toVector)
   }
 
   it should "report an error if the logs contain an 'official' notice of such" 
in {
-    val store = DockerLogStoreProvider.logStore(actorSystem)
+    val store = createStore()
 
     val logs = List(
       LogLine(Instant.now.toString, "stdout", "this is a log"),
       LogLine(Instant.now.toString, "stderr", Messages.logFailure))
     val container = new TestContainer(Source(toByteString(logs)))
 
-    val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, 
container, action))
+    val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, 
user, activation, container, action))
     ex.partialLogs shouldBe 
ActivationLogs(logs.map(_.toFormattedString).toVector)
   }
 
   it should "report an error if logs have been truncated" in {
-    val store = DockerLogStoreProvider.logStore(actorSystem)
+    val store = createStore()
 
     val logs = List(
       LogLine(Instant.now.toString, "stdout", "this is a log"),
       LogLine(Instant.now.toString, "stderr", 
Messages.truncateLogs(action.limits.logs.asMegaBytes)))
     val container = new TestContainer(Source(toByteString(logs)))
 
-    val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, 
container, action))
+    val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, 
user, activation, container, action))
     ex.partialLogs shouldBe 
ActivationLogs(logs.map(_.toFormattedString).toVector)
   }
 
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 8da8aea..6182d62 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -151,8 +151,13 @@ class ContainerProxyTests
   }
 
   def createCollector(response: Future[ActivationLogs] = 
Future.successful(ActivationLogs(Vector.empty))) =
-    LoggedFunction { (transid: TransactionId, container: Container, action: 
ExecutableWhiskAction) =>
-      response
+    LoggedFunction {
+      (transid: TransactionId,
+       user: Identity,
+       activation: WhiskActivation,
+       container: Container,
+       action: ExecutableWhiskAction) =>
+        response
     }
 
   def createStore = LoggedFunction { (transid: TransactionId, activation: 
WhiskActivation) =>

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Reply via email to