This is an automated email from the ASF dual-hosted git repository.
csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new b3a57dd Provide Artifact with File Storage Activation Store (#3991)
b3a57dd is described below
commit b3a57ddf7409cd71ebb84b85143f0920995132f2
Author: James Dubee <[email protected]>
AuthorDate: Thu Jan 24 13:53:53 2019 -0500
Provide Artifact with File Storage Activation Store (#3991)
---
.../org/apache/openwhisk/core/WhiskConfig.scala | 4 +
.../core/database/ActivationFileStorage.scala | 118 ++++++++++++++++
.../ArtifactWithFileStorageActivationStore.scala | 61 ++++++++
.../openwhisk/core/entity/WhiskActivation.scala | 14 +-
.../apache/openwhisk/core/controller/Actions.scala | 2 +-
.../openwhisk/core/controller/Activations.scala | 4 +-
.../core/controller/test/ActionsApiTests.scala | 8 +-
.../core/controller/test/ActivationsApiTests.scala | 12 +-
.../controller/test/ControllerTestCommon.scala | 8 +-
...tifactWithFileStorageActivationStoreTests.scala | 157 +++++++++++++++++++++
10 files changed, 370 insertions(+), 18 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 8061b2a..32f5a90 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -243,4 +243,8 @@ object ConfigKeys {
val controller = s"whisk.controller"
val controllerActivation = s"$controller.activation"
+
+ val activationStore = "whisk.activation-store"
+ val activationStoreWithFileStorage = s"$activationStore.with-file-storage"
+
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationFileStorage.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationFileStorage.scala
new file mode 100644
index 0000000..7fd7741
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationFileStorage.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database
+
+import java.nio.file.attribute.PosixFilePermission._
+import java.nio.file.{Files, Path}
+import java.time.Instant
+import java.util.EnumSet
+
+import akka.stream.ActorMaterializer
+import akka.stream.alpakka.file.scaladsl.LogRotatorSink
+import akka.stream.scaladsl.{Flow, MergeHub, RestartSink, Sink, Source}
+import akka.util.ByteString
+import org.apache.openwhisk.common.Logging
+import
org.apache.openwhisk.core.containerpool.logging.ElasticSearchJsonProtocol._
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import spray.json._
+
+import scala.concurrent.duration._
+
+class ActivationFileStorage(logFilePrefix: String,
+ logPath: Path,
+ actorMaterializer: ActorMaterializer,
+ logging: Logging) {
+
+ implicit val materializer = actorMaterializer
+
+ private var logFile = logPath
+ private val bufferSize = 100.MB
+ private val perms = EnumSet.of(OWNER_READ, OWNER_WRITE, GROUP_READ,
GROUP_WRITE, OTHERS_READ, OTHERS_WRITE)
+ private val writeToFile: Sink[ByteString, _] = MergeHub
+ .source[ByteString]
+ .batchWeighted(bufferSize.toBytes, _.length, identity)(_ ++ _)
+ .to(RestartSink.withBackoff(minBackoff = 1.seconds, maxBackoff =
60.seconds, randomFactor = 0.2) { () =>
+ LogRotatorSink(() => {
+ val maxSize = bufferSize.toBytes
+ var bytesRead = maxSize
+ element =>
+ {
+ val size = element.size
+
+ if (bytesRead + size > maxSize) {
+ logFile =
logPath.resolve(s"$logFilePrefix-${Instant.now.toEpochMilli}.log")
+
+ logging.info(this, s"Rotating log file to '$logFile'")
+ createLogFile(logFile)
+ bytesRead = size
+ Some(logFile)
+ } else {
+ bytesRead += size
+ None
+ }
+ }
+ })
+ })
+ .run()
+
+ private def createLogFile(path: Path) =
+ try {
+ Files.createFile(path)
+ Files.setPosixFilePermissions(path, perms)
+ } catch {
+ case t: Throwable =>
+ logging.error(this, s"Couldn't create user log file '$t'")
+ throw t
+ }
+
+ private def transcribeLogs(activation: WhiskActivation, additionalFields:
Map[String, JsValue]) =
+ activation.logs.logs.map { log =>
+ val line = JsObject(
+ Map("type" -> "user_log".toJson) ++ Map("message" -> log.toJson) ++
Map(
+ "activationId" -> activation.activationId.toJson) ++
additionalFields)
+
+ ByteString(s"${line.compactPrint}\n")
+ }
+
+ private def transcribeActivation(activation: WhiskActivation,
additionalFields: Map[String, JsValue]) = {
+ val transactionType = Map("type" -> "activation_record".toJson)
+ val message = Map(
+ "message" -> s"Activation record '${activation.activationId}' for entity
'${activation.name}'".toJson)
+ val annotations = activation.annotations.toJsObject.fields
+ val addFields = transactionType ++ annotations ++ message ++
additionalFields
+ val removeFields = Seq("logs", "annotations")
+ val line = activation.metadata.toExtendedJson(removeFields, addFields)
+
+ ByteString(s"${line.compactPrint}\n")
+ }
+
+ def getLogFile = logFile
+
+ def activationToFile(activation: WhiskActivation,
+ context: UserContext,
+ additionalFields: Map[String, JsValue] = Map.empty) = {
+ val transcribedLogs = transcribeLogs(activation, additionalFields)
+ val transcribedActivation = transcribeActivation(activation,
additionalFields)
+
+ // Write each log line to file and then write the activation metadata
+ Source
+ .fromIterator(() => transcribedLogs.toIterator)
+
.runWith(Flow[ByteString].concat(Source.single(transcribedActivation)).to(writeToFile))
+ }
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStore.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStore.scala
new file mode 100644
index 0000000..597a1f9
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStore.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database
+
+import java.nio.file.Paths
+
+import akka.actor.ActorSystem
+import akka.stream._
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.entity.{DocInfo, _}
+import pureconfig.loadConfigOrThrow
+import spray.json._
+
+import scala.concurrent.Future
+
+case class ArtifactWithFileStorageActivationStoreConfig(logFilePrefix: String,
logPath: String, userIdField: String)
+
+class ArtifactWithFileStorageActivationStore(
+ actorSystem: ActorSystem,
+ actorMaterializer: ActorMaterializer,
+ logging: Logging,
+ config: ArtifactWithFileStorageActivationStoreConfig =
+
loadConfigOrThrow[ArtifactWithFileStorageActivationStoreConfig](ConfigKeys.activationStoreWithFileStorage))
+ extends ArtifactActivationStore(actorSystem, actorMaterializer, logging) {
+
+ private val activationFileStorage =
+ new ActivationFileStorage(config.logFilePrefix, Paths.get(config.logPath),
actorMaterializer, logging)
+
+ def getLogFile = activationFileStorage.getLogFile
+
+ override def store(activation: WhiskActivation, context: UserContext)(
+ implicit transid: TransactionId,
+ notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
+ val additionalFields = Map(config.userIdField ->
context.user.namespace.uuid.toJson)
+
+ activationFileStorage.activationToFile(activation, context,
additionalFields)
+ super.store(activation, context)
+ }
+
+}
+
+object ArtifactWithFileStorageActivationStoreProvider extends
ActivationStoreProvider {
+ override def instance(actorSystem: ActorSystem, actorMaterializer:
ActorMaterializer, logging: Logging) =
+ new ArtifactWithFileStorageActivationStore(actorSystem, actorMaterializer,
logging)
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
index 59c892a..4222e51 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskActivation.scala
@@ -99,9 +99,10 @@ case class WhiskActivation(namespace: EntityPath,
def resultAsJson = response.result.toJson.asJsObject
- def toExtendedJson = {
+ def toExtendedJson(removeFields: Seq[String] = Seq.empty, addFields:
Map[String, JsValue] = Map.empty) = {
val JsObject(baseFields) = WhiskActivation.serdes.write(this).asJsObject
- val newFields = (baseFields - "response") + ("response" ->
response.toExtendedJson)
+
+ val newFields = (baseFields - "response") + ("response" ->
response.toExtendedJson) -- removeFields ++ addFields
if (end != Instant.EPOCH) {
val durationValue = (duration getOrElse (end.toEpochMilli -
start.toEpochMilli)).toJson
JsObject(newFields + ("duration" -> durationValue))
@@ -110,10 +111,17 @@ case class WhiskActivation(namespace: EntityPath,
}
}
+ def metadata = {
+ copy(response = response.withoutResult, annotations = Parameters(), logs =
ActivationLogs())
+ .revision[WhiskActivation](rev)
+ }
+ def withoutResult = {
+ copy(response = response.withoutResult)
+ .revision[WhiskActivation](rev)
+ }
def withoutLogsOrResult = {
copy(response = response.withoutResult, logs =
ActivationLogs()).revision[WhiskActivation](rev)
}
-
def withoutLogs = copy(logs =
ActivationLogs()).revision[WhiskActivation](rev)
def withLogs(logs: ActivationLogs) = copy(logs =
logs).revision[WhiskActivation](rev)
}
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala
index 3587a7e..d535fd1 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala
@@ -263,7 +263,7 @@ trait WhiskActionsApi extends WhiskCollectionAPI with
PostActionActivation with
complete(Accepted, activationId.toJsObject)
}
case Success(Right(activation)) =>
- val response = if (result) activation.resultAsJson else
activation.toExtendedJson
+ val response = if (result) activation.resultAsJson else
activation.toExtendedJson()
respondWithActivationIdHeader(activation.activationId) {
if (activation.response.isSuccess) {
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Activations.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Activations.scala
index 5e8f0ab..9100ba4 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Activations.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Activations.scala
@@ -172,7 +172,7 @@ trait WhiskActivationsApi extends Directives with
AuthenticatedRouteProvider wit
case None =>
activationStore.listActivationsInNamespace(namespace, skip.n,
limit.n, docs, since, upto, context)
}
- listEntities(activations map (_.fold((js) => js, (wa) =>
wa.map(_.toExtendedJson))))
+ listEntities(activations map (_.fold((js) => js, (wa) =>
wa.map(_.toExtendedJson()))))
}
}
}
@@ -191,7 +191,7 @@ trait WhiskActivationsApi extends Directives with
AuthenticatedRouteProvider wit
pathEndOrSingleSlash {
getEntity(
activationStore.get(ActivationId(docid.asString), context),
- postProcess = Some((activation: WhiskActivation) =>
complete(activation.toExtendedJson)))
+ postProcess = Some((activation: WhiskActivation) =>
complete(activation.toExtendedJson())))
} ~ (pathPrefix(resultPath) & pathEnd) { fetchResponse(context, docid) } ~
(pathPrefix(logsPath) & pathEnd) { fetchLogs(context, docid) }
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
index b69536b..761a09b 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
@@ -1386,7 +1386,7 @@ class ActionsApiTests extends ControllerTestCommon with
WhiskActionsApi {
Post(s"$collectionPath/${action.name}?blocking=true") ~>
Route.seal(routes(creds)) ~> check {
status should be(OK)
val response = responseAs[JsObject]
- response should be(activation.withoutLogs.toExtendedJson)
+ response should be(activation.withoutLogs.toExtendedJson())
}
// repeat invoke, get only result back
@@ -1422,7 +1422,7 @@ class ActionsApiTests extends ControllerTestCommon with
WhiskActionsApi {
Post(s"$collectionPath/${action.name}?blocking=true") ~>
Route.seal(routes(creds)) ~> check {
status should be(OK)
val response = responseAs[JsObject]
- response should be(activation.withoutLogs.toExtendedJson)
+ response should be(activation.withoutLogs.toExtendedJson())
}
// repeat invoke, get only result back
@@ -1475,7 +1475,7 @@ class ActionsApiTests extends ControllerTestCommon with
WhiskActionsApi {
Post(s"$collectionPath/${action.name}?blocking=true&timeout=500") ~>
Route.seal(routes(creds)) ~> check {
status shouldBe OK
val response = responseAs[JsObject]
- response shouldBe activation.withoutLogs.toExtendedJson
+ response shouldBe activation.withoutLogs.toExtendedJson()
headers should contain(RawHeader(ActivationIdHeader,
response.fields("activationId").convertTo[String]))
}
} finally {
@@ -1504,7 +1504,7 @@ class ActionsApiTests extends ControllerTestCommon with
WhiskActionsApi {
Post(s"$collectionPath/${action.name}?blocking=true") ~>
Route.seal(routes(creds)) ~> check {
status should be(InternalServerError)
val response = responseAs[JsObject]
- response should be(activation.withoutLogs.toExtendedJson)
+ response should be(activation.withoutLogs.toExtendedJson())
headers should contain(RawHeader(ActivationIdHeader,
response.fields("activationId").convertTo[String]))
}
} finally {
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala
index f540c6e..8ad2e4b 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala
@@ -188,7 +188,7 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
status should be(OK)
val response = responseAs[List[JsObject]]
activations.length should be(response.length)
- response should contain theSameElementsAs
activations.map(_.toExtendedJson)
+ response should contain theSameElementsAs
activations.map(_.toExtendedJson())
}
}
} finally {
@@ -270,7 +270,7 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
status should be(OK)
val response = responseAs[List[JsObject]]
expected.length should be(response.length)
- response should contain theSameElementsAs
expected.map(_.toExtendedJson)
+ response should contain theSameElementsAs
expected.map(_.toExtendedJson())
}
}
}
@@ -286,7 +286,7 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
status should be(OK)
val response = responseAs[List[JsObject]]
expected.length should be(response.length)
- response should contain theSameElementsAs
expected.map(_.toExtendedJson)
+ response should contain theSameElementsAs
expected.map(_.toExtendedJson())
}
}
}
@@ -302,7 +302,7 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
status should be(OK)
val response = responseAs[List[JsObject]]
expected.length should be(response.length)
- response should contain theSameElementsAs
expected.map(_.toExtendedJson)
+ response should contain theSameElementsAs
expected.map(_.toExtendedJson())
}
}
}
@@ -538,14 +538,14 @@ class ActivationsApiTests extends ControllerTestCommon
with WhiskActivationsApi
Get(s"$collectionPath/${activation.activationId.asString}") ~>
Route.seal(routes(creds)) ~> check {
status should be(OK)
val response = responseAs[JsObject]
- response should be(activation.toExtendedJson)
+ response should be(activation.toExtendedJson())
}
// it should "get activation by name in explicit namespace owned by
subject" in
Get(s"/$namespace/${collection.path}/${activation.activationId.asString}") ~>
Route.seal(routes(creds)) ~> check {
status should be(OK)
val response = responseAs[JsObject]
- response should be(activation.toExtendedJson)
+ response should be(activation.toExtendedJson())
}
// it should "reject get activation by name in explicit namespace not
owned by subject" in
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
index d623b3d..9e58639 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
@@ -125,7 +125,9 @@ protected trait ControllerTestCommon
() => {
val activations: Future[Either[List[JsObject], List[WhiskActivation]]]
=
activationStore.listActivationsInNamespace(namespace, 0, 0, context
= context)
- val listFuture: Future[List[JsObject]] = activations map (_.fold((js)
=> js, (wa) => wa.map(_.toExtendedJson)))
+ val listFuture: Future[List[JsObject]] = activations map (_.fold(
+ (js) => js,
+ (wa) => wa.map(_.toExtendedJson())))
listFuture map { l =>
if (l.length != count) {
@@ -146,7 +148,9 @@ protected trait ControllerTestCommon
() => {
val activations: Future[Either[List[JsObject], List[WhiskActivation]]]
=
activationStore.listActivationsMatchingName(namespace, name, 0, 0,
context = context)
- val listFuture: Future[List[JsObject]] = activations map (_.fold((js)
=> js, (wa) => wa.map(_.toExtendedJson)))
+ val listFuture: Future[List[JsObject]] = activations map (_.fold(
+ (js) => js,
+ (wa) => wa.map(_.toExtendedJson())))
listFuture map { l =>
if (l.length != count) {
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStoreTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStoreTests.scala
new file mode 100644
index 0000000..9253844
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/ArtifactWithFileStorageActivationStoreTests.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database
+
+import java.io.File
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpRequest
+import akka.stream.ActorMaterializer
+import akka.testkit.TestKit
+import common.StreamLogging
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpecLike, Matchers}
+import spray.json.DefaultJsonProtocol._
+import spray.json._
+import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size.SizeInt
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.io.Source
+
+@RunWith(classOf[JUnitRunner])
+class ArtifactWithFileStorageActivationStoreTests()
+ extends TestKit(ActorSystem("ArtifactWithFileStorageActivationStoreTests"))
+ with FlatSpecLike
+ with Matchers
+ with ScalaFutures
+ with StreamLogging {
+
+ implicit val transid: TransactionId = TransactionId.testing
+ implicit val notifier: Option[CacheChangeNotification] = None
+
+ private val materializer = ActorMaterializer()
+ private val uuid = UUID()
+ private val subject = Subject()
+ private val user =
+ Identity(subject, Namespace(EntityName("testSpace"), uuid),
BasicAuthenticationAuthKey(uuid, Secret()), Set())
+ private val context = UserContext(user, HttpRequest())
+
+ private def await[T](awaitable: Future[T], timeout: FiniteDuration =
10.seconds) = Await.result(awaitable, timeout)
+
+ def responsePermutations = {
+ val message = JsObject("result key" -> JsString("result value"))
+ Seq(
+ ActivationResponse.success(None),
+ ActivationResponse.success(Some(message)),
+ ActivationResponse.applicationError(message),
+ ActivationResponse.whiskError(message))
+ }
+
+ def logPermutations = {
+ Seq(
+ ActivationLogs(),
+ ActivationLogs(Vector("2018-03-05T02:10:38.196689520Z stdout: single log
line")),
+ ActivationLogs(
+ Vector(
+ "2018-03-05T02:10:38.196689522Z stdout: first log line of multiple
lines",
+ "2018-03-05T02:10:38.196754258Z stdout: second log line of multiple
lines")))
+ }
+
+ def expectedFileContent(activation: WhiskActivation) = {
+ val expectedLogs = activation.logs.logs.map { log =>
+ JsObject(
+ "type" -> "user_log".toJson,
+ "message" -> log.toJson,
+ "activationId" -> activation.activationId.toJson,
+ "namespaceId" -> user.namespace.uuid.toJson)
+ }
+ val expectedActivation = JsObject(
+ "type" -> "activation_record".toJson,
+ "duration" -> activation.duration.toJson,
+ "name" -> activation.name.toJson,
+ "subject" -> activation.subject.toJson,
+ "waitTime" -> activation.annotations.get("waitTime").toJson.toJson,
+ "activationId" -> activation.activationId.toJson,
+ "namespaceId" -> user.namespace.uuid.toJson,
+ "publish" -> activation.publish.toJson,
+ "version" -> activation.version.toJson,
+ "response" -> activation.response.withoutResult.toExtendedJson,
+ "end" -> activation.end.toEpochMilli.toJson,
+ "message" -> s"Activation record '${activation.activationId}' for entity
'${activation.name}'".toJson,
+ "kind" -> activation.annotations.get("kind").toJson.toJson,
+ "start" -> activation.start.toEpochMilli.toJson,
+ "limits" -> activation.annotations.get("limits").toJson.toJson,
+ "initTime" -> activation.annotations.get("initTime").toJson,
+ "namespace" -> activation.namespace.toJson)
+
+ expectedLogs ++ Seq(expectedActivation)
+ }
+
+ it should "store activations in artifact store and to file" in {
+ val config = ArtifactWithFileStorageActivationStoreConfig("userlogs",
"logs", "namespaceId")
+ val activationStore = new ArtifactWithFileStorageActivationStore(system,
materializer, logging, config)
+ val logDir = new File(new File(".").getCanonicalPath, config.logPath)
+
+ try {
+ logDir.mkdir
+
+ val activations = responsePermutations.map { response =>
+ logPermutations.map { logs =>
+ val activation = WhiskActivation(
+ namespace = EntityPath(subject.asString),
+ name = EntityName("name"),
+ subject = subject,
+ activationId = ActivationId.generate(),
+ start = Instant.now,
+ end = Instant.now,
+ response = response,
+ logs = logs,
+ duration = Some(101L),
+ annotations = Parameters("kind", "nodejs:6") ++ Parameters(
+ "limits",
+ ActionLimits(TimeLimit(60.second), MemoryLimit(256.MB),
LogLimit(10.MB)).toJson) ++
+ Parameters("waitTime", 16.toJson) ++
+ Parameters("initTime", 44.toJson))
+ val docInfo = await(activationStore.store(activation, context))
+ val fullyQualifiedActivationId = ActivationId(docInfo.id.asString)
+
+ await(activationStore.get(fullyQualifiedActivationId, context))
shouldBe activation
+ await(activationStore.delete(fullyQualifiedActivationId, context))
+ activation
+ }
+ }.flatten
+
+ Source
+ .fromFile(activationStore.getLogFile.toFile.getAbsoluteFile)
+ .getLines
+ .toList
+ .map(_.parseJson)
+ .toJson
+ .convertTo[JsArray] shouldBe
activations.map(expectedFileContent).flatten.toJson.convertTo[JsArray]
+ } finally {
+ activationStore.getLogFile.toFile.getAbsoluteFile.delete
+ logDir.delete
+ }
+ }
+
+}