This is an automated email from the ASF dual-hosted git repository. cbickel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new d69f8db Add LogStore which stores to database and file simultaneously. (#2974) d69f8db is described below commit d69f8db02be70db0da1e07614124ea2e08dd6826 Author: Markus Thömmes <markusthoem...@me.com> AuthorDate: Wed Dec 6 13:57:22 2017 +0100 Add LogStore which stores to database and file simultaneously. (#2974) Adds a new LogStore which reads from the docker file in a streaming fashion, enhances the lines read and writes them out to an external file. This file can then be picked up by any asynchronous external process to decide what to do with those logs outside of the container lifecycle of the invoker. Notable changes: - Make LogStore configurable via ansible. - Add activation and user information to the collectLogs interface. - Renamed the existing LogStore to be more self-explaining. --- ansible/group_vars/all | 2 + ansible/roles/controller/tasks/deploy.yml | 2 + ansible/roles/invoker/tasks/deploy.yml | 1 + common/scala/src/main/resources/reference.conf | 4 +- .../logging/DockerToActivationFileLogStore.scala | 156 ++++++++++++++++++ ...tore.scala => DockerToActivationLogStore.scala} | 19 +-- .../containerpool/logging/LogRotatorSink.scala | 176 +++++++++++++++++++++ .../core/containerpool/logging/LogStore.scala | 10 +- .../whisk/core/containerpool/ContainerProxy.scala | 32 ++-- .../scala/whisk/core/invoker/InvokerReactive.scala | 1 + .../docker/test/DockerContainerTests.scala | 4 +- .../test/DockerToActivationFileLogStoreTests.scala | 78 +++++++++ ...scala => DockerToActivationLogStoreTests.scala} | 19 ++- .../containerpool/test/ContainerProxyTests.scala | 9 +- 14 files changed, 473 insertions(+), 40 deletions(-) diff --git a/ansible/group_vars/all b/ansible/group_vars/all index 0e89cc0..7467aee 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -184,6 +184,8 @@ invoker: docker: become: "{{ invoker_docker_become | default(false) }}" +userLogs: + spi: "{{ userLogs_spi | default('whisk.core.containerpool.logging.DockerToActivationLogStoreProvider') }}" nginx: confdir: "{{ config_root_dir }}/nginx" diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index a0cdc72..6caa545 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -94,6 +94,8 @@ "CONFIG_kamon_statsd_hostname": "{{ metrics.kamon.host }}" "CONFIG_kamon_statsd_port": "{{ metrics.kamon.port }}" + + "CONFIG_whisk_spi_LogStoreProvider": "{{ userLogs.spi }}" volumes: - "{{ whisk_logs_dir }}/controller{{ groups['controllers'].index(inventory_hostname) }}:/logs" ports: diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index 0aa0d5d..112b37b 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -153,6 +153,7 @@ -e METRICS_LOG='{{ metrics.log.enabled }}' -e CONFIG_kamon_statsd_hostname='{{ metrics.kamon.host }}' -e CONFIG_kamon_statsd_port='{{ metrics.kamon.port }}' + -e CONFIG_whisk_spi_LogStoreProvider='{{ userLogs.spi }}' -v /sys/fs/cgroup:/sys/fs/cgroup -v /run/runc:/run/runc -v {{ whisk_logs_dir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}:/logs diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf index bf2e694..45543e5 100644 --- a/common/scala/src/main/resources/reference.conf +++ b/common/scala/src/main/resources/reference.conf @@ -2,5 +2,5 @@ whisk.spi{ ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider ContainerFactoryProvider = whisk.core.containerpool.docker.DockerContainerFactoryProvider - LogStoreProvider = whisk.core.containerpool.logging.DockerLogStoreProvider -} \ No newline at end of file + LogStoreProvider = whisk.core.containerpool.logging.DockerToActivationLogStoreProvider +} diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala new file mode 100644 index 0000000..f31c320 --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool.logging + +import java.nio.file.{Path, Paths} + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.alpakka.file.scaladsl.LogRotatorSink +import akka.stream.{Graph, SinkShape, UniformFanOutShape} +import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, Sink, Source} +import akka.util.ByteString +import whisk.common.TransactionId +import whisk.core.containerpool.Container +import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation} +import whisk.core.entity.size._ +import spray.json._ +import spray.json.DefaultJsonProtocol._ +import java.time.Instant + +import whisk.http.Messages + +import scala.concurrent.Future + +/** + * Docker based implementation of a LogStore. + * + * Relies on docker's implementation details with regards to the JSON log-driver. When using the JSON log-driver + * docker writes stdout/stderr to a JSON formatted file which is read by this store. Logs are written in the + * activation record itself. + * + * Additionally writes logs to a separate file which can be processed by any backend service asynchronously. + */ +class DockerToActivationFileLogStore(system: ActorSystem, destinationDirectory: Path = Paths.get("logs")) + extends DockerToActivationLogStore(system) { + + /** + * End of an event as written to a file. Closes the json-object and also appends a newline. + */ + private val eventEnd = ByteString("}\n") + + private def fieldsString(fields: Map[String, JsValue]) = + fields + .map { + case (key, value) => s""""$key":${value.compactPrint}""" + } + .mkString(",") + + /** + * Merges all file-writing streams into one globally buffered stream. + * + * This effectively decouples the time it takes to {@code collectLogs} from the time it takes to write the augmented + * logging data to a file on the disk. + * + * All lines are written to a rotating sink, which will create a new file, appended with the creation timestamp, + * once the defined limit is reached. + */ + val bufferSize = 100.MB + protected val writeToFile: Sink[ByteString, _] = MergeHub + .source[ByteString] + .batchWeighted(bufferSize.toBytes, _.length, identity)(_ ++ _) + .to(LogRotatorSink(() => { + val maxSize = bufferSize.toBytes + var bytesRead = maxSize + element => + { + val size = element.size + if (bytesRead + size > maxSize) { + bytesRead = size + Some(destinationDirectory.resolve(s"userlogs-${Instant.now.toEpochMilli}.log")) + } else { + bytesRead += size + None + } + } + })) + .run() + + override def collectLogs(transid: TransactionId, + user: Identity, + activation: WhiskActivation, + container: Container, + action: ExecutableWhiskAction): Future[ActivationLogs] = { + + val logs = container.logs(action.limits.logs.asMegaBytes, action.exec.sentinelledLogs)(transid) + + val additionalMetadata = Map( + "activationId" -> activation.activationId.asString.toJson, + "action" -> action.fullyQualifiedName(false).asString.toJson, + "userId" -> user.authkey.uuid.toJson) + + // Manually construct JSON fields to omit parsing the whole structure + val metadata = ByteString("," + fieldsString(additionalMetadata)) + + val toSeq = Flow[ByteString].via(DockerToActivationLogStore.toFormattedString).toMat(Sink.seq[String])(Keep.right) + val toFile = Flow[ByteString] + // As each element is a JSON-object, we know we can add the manually constructed fields to it by dropping + // the closing "}", adding the fields and finally add "}\n" to the end again. + .map(_.dropRight(1) ++ metadata ++ eventEnd) + // As the last element of the stream, print the activation record. + .concat(Source.single(ByteString(activation.toJson.compactPrint + "\n"))) + .to(writeToFile) + + val combined = OwSink.combine(toSeq, toFile)(Broadcast[ByteString](_)) + + logs.runWith(combined)._1.flatMap { seq => + val possibleErrors = Set(Messages.logFailure, Messages.truncateLogs(action.limits.logs.asMegaBytes)) + val errored = seq.lastOption.exists(last => possibleErrors.exists(last.contains)) + val logs = ActivationLogs(seq.toVector) + if (!errored) { + Future.successful(logs) + } else { + Future.failed(LogCollectingException(logs)) + } + } + } +} + +object DockerToActivationFileLogStoreProvider extends LogStoreProvider { + override def logStore(actorSystem: ActorSystem): LogStore = new DockerToActivationFileLogStore(actorSystem) +} + +object OwSink { + + /** + * Combines two sinks into one sink using the given strategy. The materialized value is a Tuple2 of the materialized + * values of either sink. Code basically copied from {@code Sink.combine} + */ + def combine[T, U, M1, M2](first: Sink[U, M1], second: Sink[U, M2])( + strategy: Int ⇒ Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, (M1, M2)] = { + Sink.fromGraph(GraphDSL.create(first, second)((_, _)) { implicit b => (s1, s2) => + import GraphDSL.Implicits._ + val d = b.add(strategy(2)) + + d ~> s1 + d ~> s2 + + SinkShape(d.in) + }) + } +} diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala similarity index 85% rename from common/scala/src/main/scala/whisk/core/containerpool/logging/DockerLogStore.scala rename to common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala index 64434ad..153aa59 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerLogStore.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala @@ -25,7 +25,7 @@ import akka.stream.scaladsl.Flow import akka.util.ByteString import whisk.common.TransactionId import whisk.core.containerpool.Container -import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, WhiskActivation} +import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation} import spray.json._ import whisk.http.Messages @@ -42,7 +42,7 @@ protected[core] object LogLine extends DefaultJsonProtocol { implicit val serdes = jsonFormat3(LogLine.apply) } -object DockerLogStore { +object DockerToActivationLogStore { /** Transforms chunked JsObjects into formatted strings */ val toFormattedString: Flow[ByteString, String, NotUsed] = @@ -54,9 +54,9 @@ object DockerLogStore { * * Relies on docker's implementation details with regards to the JSON log-driver. When using the JSON log-driver * docker writes stdout/stderr to a JSON formatted file which is read by this store. Logs are written in the - * activation record itself and thus stored in CouchDB. + * activation record itself. */ -class DockerLogStore(system: ActorSystem) extends LogStore { +class DockerToActivationLogStore(system: ActorSystem) extends LogStore { implicit val ec: ExecutionContext = system.dispatcher implicit val mat: ActorMaterializer = ActorMaterializer()(system) @@ -67,16 +67,17 @@ class DockerLogStore(system: ActorSystem) extends LogStore { override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] = Future.successful(activation.logs) override def collectLogs(transid: TransactionId, + user: Identity, + activation: WhiskActivation, container: Container, action: ExecutableWhiskAction): Future[ActivationLogs] = { - val possibleErrors = Set(Messages.logFailure, Messages.truncateLogs(action.limits.logs.asMegaBytes)) - container .logs(action.limits.logs.asMegaBytes, action.exec.sentinelledLogs)(transid) - .via(DockerLogStore.toFormattedString) + .via(DockerToActivationLogStore.toFormattedString) .runWith(Sink.seq) .flatMap { seq => + val possibleErrors = Set(Messages.logFailure, Messages.truncateLogs(action.limits.logs.asMegaBytes)) val errored = seq.lastOption.exists(last => possibleErrors.exists(last.contains)) val logs = ActivationLogs(seq.toVector) if (!errored) { @@ -88,6 +89,6 @@ class DockerLogStore(system: ActorSystem) extends LogStore { } } -object DockerLogStoreProvider extends LogStoreProvider { - override def logStore(actorSystem: ActorSystem): LogStore = new DockerLogStore(actorSystem) +object DockerToActivationLogStoreProvider extends LogStoreProvider { + override def logStore(actorSystem: ActorSystem): LogStore = new DockerToActivationLogStore(actorSystem) } diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogRotatorSink.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogRotatorSink.scala new file mode 100644 index 0000000..6c5681b --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogRotatorSink.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// TO BE TAKEN OUT AFTER ALPAKKA 0.15 RELEASE + +/* + * Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com> + */ + +package akka.stream.alpakka.file.scaladsl + +import java.nio.file.{OpenOption, Path, StandardOpenOption} + +import akka.Done +import akka.stream.ActorAttributes.SupervisionStrategy +import akka.stream._ +import akka.stream.impl.fusing.MapAsync.{Holder, NotYetThere} +import akka.stream.scaladsl.{FileIO, Sink, Source} +import akka.stream.stage._ +import akka.util.ByteString + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success} + +object LogRotatorSink { + def apply(functionGeneratorFunction: () => ByteString => Option[Path], + fileOpenOptions: Set[OpenOption] = Set(StandardOpenOption.APPEND, StandardOpenOption.CREATE)) + : Sink[ByteString, Future[Done]] = + Sink.fromGraph(new LogRotatorSink(functionGeneratorFunction, fileOpenOptions)) +} + +final private[scaladsl] class LogRotatorSink(functionGeneratorFunction: () => ByteString => Option[Path], + fileOpenOptions: Set[OpenOption]) + extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[Done]] { + + val in = Inlet[ByteString]("FRotator.in") + override val shape = SinkShape.of(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val promise = Promise[Done]() + val logic = new GraphStageLogic(shape) { + val pathGeneratorFunction: ByteString => Option[Path] = functionGeneratorFunction() + var sourceOut: SubSourceOutlet[ByteString] = _ + var fileSinkCompleted: Seq[Future[IOResult]] = Seq.empty + val decider = + inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + + def failThisStage(ex: Throwable): Unit = + if (!promise.isCompleted) { + if (sourceOut != null) { + sourceOut.fail(ex) + } + cancel(in) + promise.failure(ex) + } + + def generatePathOrFailPeacefully(data: ByteString): Option[Path] = { + var ret = Option.empty[Path] + try { + ret = pathGeneratorFunction(data) + } catch { + case ex: Throwable => + failThisStage(ex) + } + ret + } + + def fileSinkFutureCallbackHandler(future: Future[IOResult])(h: Holder[IOResult]): Unit = + h.elem match { + case Success(IOResult(_, Failure(ex))) if decider(ex) == Supervision.Stop => + promise.failure(ex) + case Success(x) if fileSinkCompleted.size == 1 && fileSinkCompleted.head == future => + promise.trySuccess(Done) + completeStage() + case x: Success[IOResult] => + fileSinkCompleted = fileSinkCompleted.filter(_ != future) + case Failure(ex) => + failThisStage(ex) + case _ => + } + + //init stage where we are waiting for the first path + setHandler( + in, + new InHandler { + override def onPush(): Unit = { + val data = grab(in) + val pathO = generatePathOrFailPeacefully(data) + pathO.fold(if (!isClosed(in)) pull(in))(switchPath(_, data)) + } + + override def onUpstreamFinish(): Unit = + completeStage() + + override def onUpstreamFailure(ex: Throwable): Unit = + failThisStage(ex) + }) + + //we must pull the first element cos we are a sink + override def preStart(): Unit = { + super.preStart() + pull(in) + } + + def futureCB(newFuture: Future[IOResult]) = + getAsyncCallback[Holder[IOResult]](fileSinkFutureCallbackHandler(newFuture)) + + //we recreate the tail of the stream, and emit the data for the next req + def switchPath(path: Path, data: ByteString): Unit = { + val prevOut = Option(sourceOut) + + sourceOut = new SubSourceOutlet[ByteString]("FRotatorSource") + sourceOut.setHandler(new OutHandler { + override def onPull(): Unit = { + sourceOut.push(data) + switchToNormalMode() + } + }) + val newFuture = Source + .fromGraph(sourceOut.source) + .runWith(FileIO.toPath(path, fileOpenOptions))(interpreter.subFusingMaterializer) + + fileSinkCompleted = fileSinkCompleted :+ newFuture + + val holder = new Holder[IOResult](NotYetThere, futureCB(newFuture)) + + newFuture.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + + prevOut.foreach(_.complete()) + } + + //we change path if needed or push the grabbed data + def switchToNormalMode(): Unit = { + setHandler( + in, + new InHandler { + override def onPush(): Unit = { + val data = grab(in) + val pathO = generatePathOrFailPeacefully(data) + pathO.fold(sourceOut.push(data))(switchPath(_, data)) + } + + override def onUpstreamFinish(): Unit = { + implicit val executionContext: ExecutionContext = + akka.dispatch.ExecutionContexts.sameThreadExecutionContext + promise.completeWith(Future.sequence(fileSinkCompleted).map(_ => Done)) + sourceOut.complete() + } + + override def onUpstreamFailure(ex: Throwable): Unit = + failThisStage(ex) + }) + sourceOut.setHandler(new OutHandler { + override def onPull(): Unit = + pull(in) + }) + } + } + (logic, promise.future) + } + +} diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala index 7df2f2c..335eed5 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala @@ -20,7 +20,7 @@ package whisk.core.containerpool.logging import akka.actor.ActorSystem import whisk.common.TransactionId import whisk.core.containerpool.Container -import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, WhiskActivation} +import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation} import whisk.spi.Spi import scala.concurrent.Future @@ -54,11 +54,17 @@ trait LogStore { * record in the database. * * @param transid transaction the activation ran in + * @param user the user who ran the activation + * @param activation the activation record * @param container container used by the activation * @param action action that was activated * @return logs for the given activation */ - def collectLogs(transid: TransactionId, container: Container, action: ExecutableWhiskAction): Future[ActivationLogs] + def collectLogs(transid: TransactionId, + user: Identity, + activation: WhiskActivation, + container: Container, + action: ExecutableWhiskAction): Future[ActivationLogs] /** * Fetch relevant logs for the given activation from the store. diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala index a626a7f..beee654 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala @@ -89,13 +89,14 @@ case object ContainerRemoved * @param unusedTimeout time after which the container is automatically thrown away * @param pauseGrace time to wait for new work before pausing the container */ -class ContainerProxy(factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container], - sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any], - storeActivation: (TransactionId, WhiskActivation) => Future[Any], - collectLogs: (TransactionId, Container, ExecutableWhiskAction) => Future[ActivationLogs], - instance: InstanceId, - unusedTimeout: FiniteDuration, - pauseGrace: FiniteDuration) +class ContainerProxy( + factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container], + sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any], + storeActivation: (TransactionId, WhiskActivation) => Future[Any], + collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], + instance: InstanceId, + unusedTimeout: FiniteDuration, + pauseGrace: FiniteDuration) extends FSM[ContainerState, ContainerData] with Stash { implicit val ec = context.system.dispatcher @@ -367,7 +368,7 @@ class ContainerProxy(factory: (TransactionId, String, ImageName, Boolean, ByteSi val activationWithLogs: Future[Either[ActivationLogReadingError, WhiskActivation]] = activation .flatMap { activation => val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS) - collectLogs(tid, container, job.action) + collectLogs(tid, job.msg.user, activation, container, job.action) .andThen { case Success(_) => tid.finished(this, start) case Failure(t) => tid.failed(this, start, s"reading logs failed: $t") @@ -394,13 +395,14 @@ class ContainerProxy(factory: (TransactionId, String, ImageName, Boolean, ByteSi } object ContainerProxy { - def props(factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container], - ack: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any], - store: (TransactionId, WhiskActivation) => Future[Any], - collectLogs: (TransactionId, Container, ExecutableWhiskAction) => Future[ActivationLogs], - instance: InstanceId, - unusedTimeout: FiniteDuration = 10.minutes, - pauseGrace: FiniteDuration = 50.milliseconds) = + def props( + factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container], + ack: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any], + store: (TransactionId, WhiskActivation) => Future[Any], + collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], + instance: InstanceId, + unusedTimeout: FiniteDuration = 10.minutes, + pauseGrace: FiniteDuration = 50.milliseconds) = Props(new ContainerProxy(factory, ack, store, collectLogs, instance, unusedTimeout, pauseGrace)) // Needs to be thread-safe as it's used by multiple proxies concurrently. diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index a835d9e..2053e3f 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -63,6 +63,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa implicit val cfg = config private val logsProvider = SpiLoader.get[LogStoreProvider].logStore(actorSystem) + logging.info(this, s"LogStoreProvider: ${logsProvider.getClass}") /** * Factory used by the ContainerProxy to physically create a new container. diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala index 1519d09..5f9898e 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala @@ -33,7 +33,7 @@ import org.junit.runner.RunWith import org.scalamock.scalatest.MockFactory import org.scalatest.BeforeAndAfterEach import org.scalatest.FlatSpec -import whisk.core.containerpool.logging.{DockerLogStore, LogLine} +import whisk.core.containerpool.logging.{DockerToActivationLogStore, LogLine} import org.scalatest.junit.JUnitRunner import org.scalatest.Matchers @@ -76,7 +76,7 @@ class DockerContainerTests /** Reads logs into memory and awaits them */ def awaitLogs(source: Source[ByteString, Any], timeout: FiniteDuration = 500.milliseconds): Vector[String] = - Await.result(source.via(DockerLogStore.toFormattedString).runWith(Sink.seq[String]), timeout).toVector + Await.result(source.via(DockerToActivationLogStore.toFormattedString).runWith(Sink.seq[String]), timeout).toVector val containerId = ContainerId("id") diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala new file mode 100644 index 0000000..1f5f22c --- /dev/null +++ b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool.logging.test + +import java.time.Instant + +import akka.stream.scaladsl.{Flow, Sink, Source} +import akka.testkit.TestProbe +import akka.util.ByteString +import common.{StreamLogging, WskActorSystem} +import org.scalatest.Matchers +import spray.json._ +import whisk.common.TransactionId +import whisk.core.containerpool.logging.{DockerToActivationFileLogStore, LogLine} +import whisk.core.entity._ + +/** + * Includes the tests for the DockerToActivationLogStore since the behavior towards the activation storage should + * remain exactly the same. + */ +class DockerToActivationFileLogStoreTests + extends DockerToActivationLogStoreTests + with Matchers + with WskActorSystem + with StreamLogging { + + override def createStore() = new TestLogStoreTo(Sink.ignore) + + def toLoggedEvent(line: LogLine, userId: UUID, activationId: ActivationId, actionName: FullyQualifiedEntityName) = { + val event = line.toJson.compactPrint + val concatenated = + s""","activationId":"${activationId.asString}","action":"${actionName.asString}","userId":"${userId.asString}"""" + + event.dropRight(1) ++ concatenated ++ "}\n" + } + + behavior of "DockerCouchDbFileLogStore" + + it should "read logs returned by the container,in mem and enrich + write them to the provided sink" in { + val logs = List(LogLine(Instant.now.toString, "stdout", "this is just a test")) + + val testSource: Source[ByteString, _] = Source(logs.map(line => ByteString(line.toJson.compactPrint))) + + val testActor = TestProbe() + + val container = new TestContainer(testSource) + val store = new TestLogStoreTo(Flow[ByteString].map(_.utf8String).to(Sink.actorRef(testActor.ref, ()))) + + val collected = store.collectLogs(TransactionId.testing, user, activation, container, action) + + await(collected) shouldBe ActivationLogs(logs.map(_.toFormattedString).toVector) + logs.foreach { line => + testActor.expectMsg( + toLoggedEvent(line, user.authkey.uuid, activation.activationId, action.fullyQualifiedName(false))) + } + + // Last message should be the full activation + testActor.expectMsg(activation.toJson.compactPrint + "\n") + } + + class TestLogStoreTo(override val writeToFile: Sink[ByteString, _]) + extends DockerToActivationFileLogStore(actorSystem) +} diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala similarity index 85% rename from tests/src/test/scala/whisk/core/containerpool/logging/test/DockerLogStoreTests.scala rename to tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala index d6de3cc..170f9cc 100644 --- a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerLogStoreTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala @@ -19,7 +19,7 @@ package whisk.core.containerpool.logging.test import common.{StreamLogging, WskActorSystem} import org.scalatest.{FlatSpec, Matchers} -import whisk.core.containerpool.logging.{DockerLogStoreProvider, LogCollectingException, LogLine} +import whisk.core.containerpool.logging.{DockerToActivationLogStoreProvider, LogCollectingException, LogLine} import whisk.core.entity.ExecManifest.{ImageName, RuntimeManifest} import whisk.core.entity._ import java.time.Instant @@ -34,7 +34,7 @@ import whisk.http.Messages import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ -class DockerLogStoreTests extends FlatSpec with Matchers with WskActorSystem with StreamLogging { +class DockerToActivationLogStoreTests extends FlatSpec with Matchers with WskActorSystem with StreamLogging { def await[T](future: Future[T]) = Await.result(future, 1.minute) val user = Identity(Subject(), EntityName("testSpace"), AuthKey(), Set()) @@ -47,40 +47,43 @@ class DockerLogStoreTests extends FlatSpec with Matchers with WskActorSystem wit val tid = TransactionId.testing + def createStore() = DockerToActivationLogStoreProvider.logStore(actorSystem) + behavior of "DockerLogStore" it should "read logs into a sequence and parse them into the specified format" in { - val store = DockerLogStoreProvider.logStore(actorSystem) + val store = createStore() val logs = List( LogLine(Instant.now.toString, "stdout", "this is a log"), LogLine(Instant.now.toString, "stdout", "this is a log too")) val container = new TestContainer(Source(toByteString(logs))) - await(store.collectLogs(tid, container, action)) shouldBe ActivationLogs(logs.map(_.toFormattedString).toVector) + await(store.collectLogs(tid, user, activation, container, action)) shouldBe ActivationLogs( + logs.map(_.toFormattedString).toVector) } it should "report an error if the logs contain an 'official' notice of such" in { - val store = DockerLogStoreProvider.logStore(actorSystem) + val store = createStore() val logs = List( LogLine(Instant.now.toString, "stdout", "this is a log"), LogLine(Instant.now.toString, "stderr", Messages.logFailure)) val container = new TestContainer(Source(toByteString(logs))) - val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, container, action)) + val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, user, activation, container, action)) ex.partialLogs shouldBe ActivationLogs(logs.map(_.toFormattedString).toVector) } it should "report an error if logs have been truncated" in { - val store = DockerLogStoreProvider.logStore(actorSystem) + val store = createStore() val logs = List( LogLine(Instant.now.toString, "stdout", "this is a log"), LogLine(Instant.now.toString, "stderr", Messages.truncateLogs(action.limits.logs.asMegaBytes))) val container = new TestContainer(Source(toByteString(logs))) - val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, container, action)) + val ex = the[LogCollectingException] thrownBy await(store.collectLogs(tid, user, activation, container, action)) ex.partialLogs shouldBe ActivationLogs(logs.map(_.toFormattedString).toVector) } diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala index 8da8aea..6182d62 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala @@ -151,8 +151,13 @@ class ContainerProxyTests } def createCollector(response: Future[ActivationLogs] = Future.successful(ActivationLogs(Vector.empty))) = - LoggedFunction { (transid: TransactionId, container: Container, action: ExecutableWhiskAction) => - response + LoggedFunction { + (transid: TransactionId, + user: Identity, + activation: WhiskActivation, + container: Container, + action: ExecutableWhiskAction) => + response } def createStore = LoggedFunction { (transid: TransactionId, activation: WhiskActivation) => -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].