diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 5f7a8db5a0..56ec590963 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -231,8 +231,11 @@ object ConfigKeys {
val transactions = "whisk.transactions"
val logStore = "whisk.logstore"
+ val activationStore = "whisk.activationstore"
+
val splunk = s"$logStore.splunk"
val logStoreElasticSearch = s"$logStore.elasticsearch"
+ val elasticSearchActivationStore = s"$activationStore.elasticsearch"
val mesos = "whisk.mesos"
diff --git
a/common/scala/src/main/scala/whisk/core/database/ArtifactElasticSearchActivationStore.scala
b/common/scala/src/main/scala/whisk/core/database/ArtifactElasticSearchActivationStore.scala
new file mode 100644
index 0000000000..f6a93227b1
--- /dev/null
+++
b/common/scala/src/main/scala/whisk/core/database/ArtifactElasticSearchActivationStore.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import java.time.Instant
+import java.nio.file.{Files, Path, Paths}
+import java.nio.file.attribute.PosixFilePermission.{
+ GROUP_READ,
+ GROUP_WRITE,
+ OTHERS_READ,
+ OTHERS_WRITE,
+ OWNER_READ,
+ OWNER_WRITE
+}
+import java.util.EnumSet
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model._
+import akka.stream.alpakka.file.scaladsl.LogRotatorSink
+import akka.stream.scaladsl.{Flow, MergeHub, RestartSink, Sink, Source}
+import akka.stream._
+import akka.util.ByteString
+import pureconfig.loadConfigOrThrow
+import spray.json._
+import whisk.common.{Logging, TransactionId}
+import whisk.core.ConfigKeys
+import whisk.core.containerpool.logging.ElasticSearchJsonProtocol._
+import whisk.core.entity._
+import whisk.core.entity.size._
+
+import scala.concurrent.{Future, Promise}
+import scala.concurrent.duration._
+import scala.util.Try
+
+class ArtifactElasticSearchActivationStore(
+ override val system: ActorSystem,
+ actorMaterializer: ActorMaterializer,
+ logging: Logging,
+ override val httpFlow: Option[
+ Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse],
Promise[HttpResponse]), Any]] = None,
+ override val elasticSearchConfig: ElasticSearchActivationStoreConfig =
+
loadConfigOrThrow[ElasticSearchActivationStoreConfig](ConfigKeys.elasticSearchActivationStore))
+ extends ArtifactActivationStore(system, actorMaterializer, logging)
+ with ElasticSearchActivationRestClient {
+
+ implicit val m = actorMaterializer
+
+ val destinationDirectory: Path = Paths.get("logs")
+ val bufferSize = 100.MB
+ val perms = EnumSet.of(OWNER_READ, OWNER_WRITE, GROUP_READ, GROUP_WRITE,
OTHERS_READ, OTHERS_WRITE)
+ protected val writeToFile: Sink[ByteString, _] = MergeHub
+ .source[ByteString]
+ .batchWeighted(bufferSize.toBytes, _.length, identity)(_ ++ _)
+ .to(RestartSink.withBackoff(minBackoff = 1.seconds, maxBackoff =
60.seconds, randomFactor = 0.2) { () =>
+ LogRotatorSink(() => {
+ val maxSize = bufferSize.toBytes
+ var bytesRead = maxSize
+ element =>
+ {
+ val size = element.size
+ if (bytesRead + size > maxSize) {
+ bytesRead = size
+ val logFilePath =
destinationDirectory.resolve(s"userlogs-${Instant.now.toEpochMilli}.log")
+ logging.info(this, s"Rotating log file to '$logFilePath'")
+ try {
+ Files.createFile(logFilePath)
+ Files.setPosixFilePermissions(logFilePath, perms)
+ } catch {
+ case t: Throwable =>
+ logging.error(this, s"Couldn't create activation record file
'$t'")
+ throw t
+ }
+ Some(logFilePath)
+ } else {
+ bytesRead += size
+ None
+ }
+ }
+ })
+ })
+ .run()
+
+ protected def extractRequiredHeaders(headers: Seq[HttpHeader]) =
+ headers.filter(h =>
elasticSearchConfig.requiredHeaders.contains(h.lowercaseName)).toList
+
+ protected def writeActivation(activation: WhiskActivation, context:
UserContext) = {
+ val userIdField = Map("namespaceId" -> context.user.namespace.uuid.toJson)
+ val namespace = Map("namespace" -> activation.namespace.namespace.toJson)
+ val name = Map("name" -> activation.name.toJson)
+ val subject = Map("subject" -> activation.subject.toJson)
+ val activationId = Map("activationId" -> activation.activationId.toJson)
+ val start = Map("start" -> activation.start.toEpochMilli.toJson)
+ val end = Map("end" -> activation.end.toEpochMilli.toJson)
+ val cause = Map("cause" -> activation.cause.toJson)
+ val result = Map("result" ->
activation.response.result.get.compactPrint.toJson)
+ val statusCode = Map("statusCode" -> activation.response.statusCode.toJson)
+ val logs = Map("logs" -> activation.logs.toJson)
+ val version = Map("version" -> activation.version.toJson)
+ val annotations = activation.annotations.toJsObject.fields
+ val duration = activation.duration.map(d => Map("duration" -> d.toJson))
getOrElse Map.empty
+ val augmentedActivation = JsObject(
+ userIdField ++ namespace ++ name ++ subject ++ activationId ++ start ++
end ++ cause ++ result ++ statusCode ++ logs ++ version ++ annotations ++
duration)
+ val line = ByteString(augmentedActivation.compactPrint + "\n")
+
+ Source.single(line).runWith(Flow[ByteString].to(writeToFile))
+ }
+
+ override def store(activation: WhiskActivation, context: UserContext)(
+ implicit transid: TransactionId,
+ notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
+ writeActivation(activation, context)
+ super.store(activation, context)
+ }
+
+ override def get(activationId: ActivationId, context: UserContext)(
+ implicit transid: TransactionId): Future[WhiskActivation] = {
+ val headers = extractRequiredHeaders(context.request.headers)
+
+ // Return result from ElasticSearch or from artifact store if required
headers are not present
+ if (headers.length == elasticSearchConfig.requiredHeaders.length) {
+ val uuid =
elasticSearchConfig.path.format(context.user.namespace.uuid.asString)
+ val id =
activationId.asString.substring(activationId.asString.indexOf("/") + 1)
+
+ getActivation(id, uuid, headers).map(_.toActivation())
+ } else {
+ super.get(activationId, context)
+ }
+ }
+
+ override def countActivationsInNamespace(namespace: EntityPath,
+ name: Option[EntityPath] = None,
+ skip: Int,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ context: UserContext)(implicit
transid: TransactionId): Future[JsObject] = {
+ val uuid =
elasticSearchConfig.path.format(context.user.namespace.uuid.asString)
+ val headers = extractRequiredHeaders(context.request.headers)
+
+ // Return result from ElasticSearch or from artifact store if required
headers are not present
+ if (headers.length == elasticSearchConfig.requiredHeaders.length) {
+ count(uuid, name, namespace.asString, skip, since, upto, headers)
+ } else {
+ super.countActivationsInNamespace(namespace, name, skip, since, upto,
context)
+ }
+ }
+
+ override def listActivationsMatchingName(
+ namespace: EntityPath,
+ name: EntityPath,
+ skip: Int,
+ limit: Int,
+ includeDocs: Boolean = false,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ context: UserContext)(implicit transid: TransactionId):
Future[Either[List[JsObject], List[WhiskActivation]]] = {
+ val uuid =
elasticSearchConfig.path.format(context.user.namespace.uuid.asString)
+ val headers = extractRequiredHeaders(context.request.headers)
+
+ // Return result from ElasticSearch or from artifact store if required
headers are not present
+ if (headers.length == elasticSearchConfig.requiredHeaders.length) {
+ listActivationMatching(uuid, name.toString, skip, limit, since, upto,
headers).map { activationList =>
+ Right(activationList.map(activation => activation.toActivation()))
+ }
+ } else {
+ super.listActivationsMatchingName(namespace, name, skip, limit,
includeDocs, since, upto, context)
+ }
+ }
+
+ override def listActivationsInNamespace(
+ namespace: EntityPath,
+ skip: Int,
+ limit: Int,
+ includeDocs: Boolean = false,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ context: UserContext)(implicit transid: TransactionId):
Future[Either[List[JsObject], List[WhiskActivation]]] = {
+ val uuid =
elasticSearchConfig.path.format(context.user.namespace.uuid.asString)
+ val headers = extractRequiredHeaders(context.request.headers)
+
+ // Return result from ElasticSearch or from artifact store if required
headers are not present
+ if (headers.length == elasticSearchConfig.requiredHeaders.length) {
+ listActivationsNamespace(uuid, namespace.asString, skip, limit, since,
upto, headers).map { activationList =>
+ Right(activationList.map(activation => activation.toActivation()))
+ }
+ } else {
+ super.listActivationsInNamespace(namespace, skip, limit, includeDocs,
since, upto, context)
+ }
+ }
+
+}
+
+object ArtifactElasticSearchActivationStoreProvider extends
ActivationStoreProvider {
+ override def instance(actorSystem: ActorSystem, actorMaterializer:
ActorMaterializer, logging: Logging) =
+ new ArtifactElasticSearchActivationStore(actorSystem, actorMaterializer,
logging)
+}
diff --git
a/common/scala/src/main/scala/whisk/core/database/ElasticSearchActivationRestClient.scala
b/common/scala/src/main/scala/whisk/core/database/ElasticSearchActivationRestClient.scala
new file mode 100644
index 0000000000..c1a5567362
--- /dev/null
+++
b/common/scala/src/main/scala/whisk/core/database/ElasticSearchActivationRestClient.scala
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.database
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model._
+import akka.stream.scaladsl.Flow
+
+import spray.json.{DefaultJsonProtocol, _}
+
+import whisk.common.TransactionId
+import whisk.core.containerpool.logging.ElasticSearchJsonProtocol._
+import whisk.core.containerpool.logging.{ElasticSearchRestClient, EsQuery,
EsQueryString, EsSearchResult, _}
+import whisk.core.entity._
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.Try
+
+case class ElasticSearchActivationFieldConfig(name: String,
+ namespace: String,
+ subject: String,
+ version: String,
+ start: String,
+ end: String,
+ duration: String,
+ result: String,
+ statusCode: String,
+ activationId: String,
+ activationRecord: String,
+ stream: String)
+
+case class ElasticSearchActivationStoreConfig(protocol: String,
+ host: String,
+ port: Int,
+ path: String,
+ schema:
ElasticSearchActivationFieldConfig,
+ requiredHeaders: Seq[String] =
Seq.empty)
+
+trait ElasticSearchActivationRestClient {
+ implicit val executionContext: ExecutionContext
+ implicit val system: ActorSystem
+ val httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]),
(Try[HttpResponse], Promise[HttpResponse]), Any]]
+ val elasticSearchConfig: ElasticSearchActivationStoreConfig
+
+ protected val esActivationClient =
+ new ElasticSearchRestClient(
+ elasticSearchConfig.protocol,
+ elasticSearchConfig.host,
+ elasticSearchConfig.port,
+ httpFlow)
+
+ // Schema of resultant activations from ES
+ case class ActivationEntry(name: String,
+ subject: String,
+ activationId: String,
+ version: String,
+ start: Long,
+ end: Long,
+ result: String,
+ statusCode: Int,
+ duration: Option[Long] = None,
+ namespace: String,
+ kind: Option[String] = None,
+ cause: Option[String] = None,
+ causedBy: Option[String] = None,
+ limits: Option[ActionLimits] = None,
+ path: Option[String] = None,
+ logs: ActivationLogs,
+ waitTime: Option[Int] = None,
+ initTime: Option[Int] = None) {
+
+ def toActivation() = {
+ val response = statusCode match {
+ case 0 => ActivationResponse.success(Some(result.parseJson.asJsObject))
+ case 1 =>
ActivationResponse.applicationError(result.parseJson.asJsObject.fields("error"))
+ case 2 =>
ActivationResponse.containerError(result.parseJson.asJsObject.fields("error"))
+ case 3 =>
ActivationResponse.whiskError(result.parseJson.asJsObject.fields("error"))
+ }
+ val causedByAnnotation = causedBy.map(value => Parameters("causedBy",
value.toJson)).getOrElse(Parameters())
+ val memoryAnnotation = limits
+ .map { value =>
+ Parameters(
+ "limits",
+ JsObject(
+ "memory" -> value.memory.megabytes.toJson,
+ "timeout" -> value.timeout.toJson,
+ "logs" -> value.logs.toJson))
+ }
+ .getOrElse(Parameters())
+ val kindAnnotation = kind.map(value => Parameters("kind",
value.toJson)).getOrElse(Parameters())
+ val pathAnnotation = path.map(value => Parameters("path",
value.toJson)).getOrElse(Parameters())
+ val waitTimeAnnotation = waitTime.map(value => Parameters("waitTime",
value.toJson)).getOrElse(Parameters())
+ val initTimeAnnotation = initTime.map(value => Parameters("initTime",
value.toJson)).getOrElse(Parameters())
+
+ WhiskActivation(
+ EntityPath(namespace),
+ EntityName(name),
+ Subject(subject),
+ ActivationId(activationId),
+ Instant.ofEpochMilli(start),
+ Instant.ofEpochMilli(end),
+ response = response,
+ logs = logs,
+ duration = duration,
+ version = SemVer(version),
+ annotations = kindAnnotation ++ causedByAnnotation ++ memoryAnnotation
++ pathAnnotation ++ waitTimeAnnotation ++ initTimeAnnotation,
+ cause = cause.map(value => Some(ActivationId(value))).getOrElse(None))
+ }
+ }
+
+ object ActivationEntry extends DefaultJsonProtocol {
+ implicit val serdes =
+ jsonFormat(
+ ActivationEntry.apply,
+ elasticSearchConfig.schema.name,
+ elasticSearchConfig.schema.subject,
+ elasticSearchConfig.schema.activationId,
+ elasticSearchConfig.schema.version,
+ elasticSearchConfig.schema.start,
+ elasticSearchConfig.schema.end,
+ "result",
+ "statusCode",
+ elasticSearchConfig.schema.duration,
+ elasticSearchConfig.schema.namespace,
+ "kind",
+ "cause",
+ "causedBy",
+ "limits",
+ "path",
+ "logs",
+ "waitTime",
+ "initTime")
+ }
+
+ protected def transcribeActivations(queryResult: EsSearchResult):
List[ActivationEntry] =
+ queryResult.hits.hits.map(_.source.convertTo[ActivationEntry]).toList
+
+ protected def getRanges(since: Option[Instant] = None, upto: Option[Instant]
= None) = {
+ val sinceRange: Option[EsQueryRange] = since.map { time =>
+ Some(EsQueryRange(elasticSearchConfig.schema.start, EsRangeGt,
time.toEpochMilli.toString))
+ } getOrElse None
+ val uptoRange: Option[EsQueryRange] = upto.map { time =>
+ Some(EsQueryRange(elasticSearchConfig.schema.start, EsRangeLt,
time.toEpochMilli.toString))
+ } getOrElse None
+
+ Vector(sinceRange, uptoRange).flatten
+ }
+
+ protected def generateGetPayload(activationId: String) = {
+ val query =
+ s"type: ${elasticSearchConfig.schema.activationRecord} AND
${elasticSearchConfig.schema.activationId}: $activationId"
+
+ EsQuery(EsQueryString(query))
+ }
+
+ protected def generateCountActivationsInNamespacePayload(name:
Option[EntityPath] = None,
+ skip: Int,
+ since:
Option[Instant] = None,
+ upto:
Option[Instant] = None) = {
+ val queryRanges = getRanges(since, upto)
+ val activationMatch = Some(EsQueryBoolMatch("type",
elasticSearchConfig.schema.activationRecord))
+ val entityMatch: Option[EsQueryBoolMatch] = name.map { n =>
+ Some(EsQueryBoolMatch(elasticSearchConfig.schema.name, n.asString))
+ } getOrElse None
+ val queryTerms = Vector(activationMatch, entityMatch).flatten
+ val queryMust = EsQueryMust(queryTerms, queryRanges)
+ val queryOrder = EsQueryOrder(elasticSearchConfig.schema.start,
EsOrderDesc)
+
+ EsQuery(queryMust, Some(queryOrder), from = skip)
+ }
+
+ protected def generateListActiationsMatchNamePayload(name: String,
+ skip: Int,
+ limit: Int,
+ since: Option[Instant]
= None,
+ upto: Option[Instant] =
None) = {
+ val queryRanges = getRanges(since, upto)
+ val queryTerms = Vector(
+ EsQueryBoolMatch("type", elasticSearchConfig.schema.activationRecord),
+ EsQueryBoolMatch(elasticSearchConfig.schema.name, name))
+ val queryMust = EsQueryMust(queryTerms, queryRanges)
+ val queryOrder = EsQueryOrder(elasticSearchConfig.schema.start,
EsOrderDesc)
+
+ EsQuery(queryMust, Some(queryOrder), Some(limit), from = skip)
+ }
+
+ protected def generateListActivationsInNamespacePayload(namespace: String,
+ skip: Int,
+ limit: Int,
+ since:
Option[Instant] = None,
+ upto:
Option[Instant] = None) = {
+ val queryRanges = getRanges(since, upto)
+ val queryTerms = Vector(
+ EsQueryBoolMatch("type", elasticSearchConfig.schema.activationRecord),
+ EsQueryBoolMatch(elasticSearchConfig.schema.subject, namespace))
+ val queryMust = EsQueryMust(queryTerms, queryRanges)
+ val queryOrder = EsQueryOrder(elasticSearchConfig.schema.start,
EsOrderDesc)
+
+ EsQuery(queryMust, Some(queryOrder), Some(limit), from = skip)
+ }
+
+ def getActivation(activationId: String, uuid: String, headers:
List[HttpHeader] = List.empty)(
+ implicit transid: TransactionId): Future[ActivationEntry] = {
+ val payload = generateGetPayload(activationId)
+
+ esActivationClient.search[EsSearchResult](uuid, payload, headers).flatMap {
+ case Right(queryResult) =>
+ val res = transcribeActivations(queryResult)
+
+ if (res.nonEmpty) {
+ Future.successful(res.head)
+ } else {
+ Future.failed(new NoDocumentException("Document not found"))
+ }
+
+ case Left(code) =>
+ Future.failed(new RuntimeException(s"Status code '$code' was returned
from activation store"))
+ }
+ }
+
+ def count(uuid: String,
+ name: Option[EntityPath] = None,
+ namespace: String,
+ skip: Int,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ headers: List[HttpHeader] = List.empty)(implicit transid:
TransactionId): Future[JsObject] = {
+ val payload = generateCountActivationsInNamespacePayload(name, skip,
since, upto)
+
+ esActivationClient.search[EsSearchResult](uuid, payload, headers).flatMap {
+ case Right(queryResult) =>
+ val total = Math.max(0, queryResult.hits.total - skip)
+ Future.successful(JsObject("activations" -> total.toJson))
+ case Left(code) =>
+ Future.failed(new RuntimeException(s"Status code '$code' was returned
from activation store"))
+ }
+ }
+
+ def listActivationMatching(
+ uuid: String,
+ name: String,
+ skip: Int,
+ limit: Int,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ headers: List[HttpHeader] = List.empty)(implicit transid: TransactionId):
Future[List[ActivationEntry]] = {
+ val payload = generateListActiationsMatchNamePayload(name, skip, limit,
since, upto)
+
+ esActivationClient.search[EsSearchResult](uuid, payload, headers).flatMap {
+ case Right(queryResult) =>
+ Future.successful(transcribeActivations(queryResult))
+ case Left(code) =>
+ Future.failed(new RuntimeException(s"Status code '$code' was returned
from activation store"))
+ }
+ }
+
+ def listActivationsNamespace(
+ uuid: String,
+ namespace: String,
+ skip: Int,
+ limit: Int,
+ since: Option[Instant] = None,
+ upto: Option[Instant] = None,
+ headers: List[HttpHeader] = List.empty)(implicit transid: TransactionId):
Future[List[ActivationEntry]] = {
+ val payload = generateListActivationsInNamespacePayload(namespace, skip,
limit, since, upto)
+
+ esActivationClient.search[EsSearchResult](uuid, payload, headers).flatMap {
+ case Right(queryResult) =>
+ Future.successful(transcribeActivations(queryResult))
+ case Left(code) =>
+ Future.failed(new RuntimeException(s"Status code '$code' was returned
from activation store"))
+ }
+ }
+
+}
diff --git a/common/scala/src/main/scala/whisk/core/entity/SemVer.scala
b/common/scala/src/main/scala/whisk/core/entity/SemVer.scala
index 6038bd012c..287ef76ab1 100644
--- a/common/scala/src/main/scala/whisk/core/entity/SemVer.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/SemVer.scala
@@ -78,7 +78,7 @@ protected[core] object SemVer {
* @return SemVer instance
* @thrown IllegalArgumentException if string is not a valid semantic version
*/
- protected[entity] def apply(str: String): SemVer = {
+ protected[core] def apply(str: String): SemVer = {
try {
val parts = if (str != null && str.nonEmpty) str.split('.') else
Array[String]()
val major = if (parts.size >= 1) parts(0).toInt else 0
diff --git a/tests/src/test/scala/system/basic/WskRestBasicTests.scala
b/tests/src/test/scala/system/basic/WskRestBasicTests.scala
index 01fed1a44e..fdb5f02dae 100644
--- a/tests/src/test/scala/system/basic/WskRestBasicTests.scala
+++ b/tests/src/test/scala/system/basic/WskRestBasicTests.scala
@@ -1036,7 +1036,7 @@ class WskRestBasicTests extends TestHelpers with
WskTestHelpers with WskActorSys
result.getFieldJsValue("start").toString should not be
JsObject.empty.toString
result.getFieldJsValue("end").toString shouldBe JsObject.empty.toString
result.getFieldJsValue("duration").toString shouldBe
JsObject.empty.toString
- result.getFieldListJsObject("annotations").length shouldBe 0
+ result.getFieldListJsObject("annotations").length shouldBe 1
}
}
diff --git
a/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchActivationStoreTests.scala
b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchActivationStoreTests.scala
new file mode 100644
index 0000000000..f71d3d8f9e
--- /dev/null
+++
b/tests/src/test/scala/whisk/core/containerpool/logging/ElasticSearchActivationStoreTests.scala
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import java.time.Instant
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpMethods.{GET, POST}
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.headers.Accept
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.Flow
+import akka.testkit.TestKit
+import common.StreamLogging
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpecLike, Matchers}
+import pureconfig.error.ConfigReaderException
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import whisk.core.entity._
+import whisk.core.database.{
+ ArtifactElasticSearchActivationStore,
+ ElasticSearchActivationFieldConfig,
+ ElasticSearchActivationStoreConfig,
+ NoDocumentException,
+ UserContext
+}
+
+import whisk.common.TransactionId
+import whisk.core.entity.size.SizeInt
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future, Promise}
+import scala.util.{Success, Try}
+
+/*
+TODO:
+Required headers
+ */
+@RunWith(classOf[JUnitRunner])
+class ElasticSearchActivationStoreTests
+ extends TestKit(ActorSystem("ElasticSearchActivationStore"))
+ with FlatSpecLike
+ with Matchers
+ with ScalaFutures
+ with StreamLogging {
+
+ val materializer = ActorMaterializer()
+
+ implicit val transid: TransactionId = TransactionId.testing
+
+ private val uuid = UUID()
+ private val subject = Subject()
+ private val user =
+ Identity(subject, Namespace(EntityName("testSpace"), uuid),
BasicAuthenticationAuthKey(uuid, Secret()), Set())
+ private val activationId = ActivationId.generate()
+ private val namespace = EntityPath("namespace")
+ private val name = EntityName("name")
+ private val response = JsObject("result key" -> JsString("result value"))
+ private val start = Instant.now
+ private val end = Instant.now
+ private val since = Instant.now
+ private val upto = Instant.now
+ private val logs =
+ Vector("2018-03-05T02:10:38.196689522Z stdout: some log stuff",
"2018-03-05T02:10:38.196754258Z stdout: more logs")
+ private val expectedLogs = ActivationLogs(logs)
+ private val activation = WhiskActivation(
+ namespace = namespace,
+ name = name,
+ subject,
+ activationId = activationId,
+ start = start,
+ end = end,
+ response = ActivationResponse.success(Some(response)),
+ logs = expectedLogs,
+ duration = Some(101L),
+ annotations = Parameters("kind", "nodejs:6") ++ Parameters(
+ "limits",
+ ActionLimits(TimeLimit(60.second), MemoryLimit(256.MB),
LogLimit(10.MB)).toJson) ++
+ Parameters("waitTime", 16.toJson) ++
+ Parameters("initTime", 44.toJson))
+
+ // Elasticsearch configuration
+ private val defaultSchema =
+ ElasticSearchActivationFieldConfig(
+ "name",
+ "namespace",
+ "subject",
+ "version",
+ "start",
+ "end",
+ "duration",
+ "result",
+ "statusCode",
+ "activationId",
+ "activation_record",
+ "stream")
+ private val defaultConfig =
+ ElasticSearchActivationStoreConfig("https", "host", 443,
"/whisk_user_logs/_search", defaultSchema)
+
+ // Elasticsearch query responses
+ private val defaultQueryResponse =
+ JsObject(
+ "took" -> 4.toJson,
+ "timed_out" -> false.toJson,
+ "_shards" -> JsObject("total" -> 5.toJson, "successful" -> 5.toJson,
"failed" -> 0.toJson),
+ "hits" -> JsObject(
+ "total" -> 2.toJson,
+ "max_score" -> 3.74084.toJson,
+ "hits" -> JsArray(
+ JsObject(
+ "_index" -> "whisk_user_logs".toJson,
+ "_type" -> defaultConfig.schema.activationRecord.toJson,
+ "_id" -> "AWUQoCrVV6WHiq7A5LL8".toJson,
+ "_score" -> 3.74084.toJson,
+ "_source" -> JsObject(
+ defaultConfig.schema.statusCode -> 0.toJson,
+ defaultConfig.schema.duration -> 101.toJson,
+ defaultConfig.schema.name -> name.toJson,
+ defaultConfig.schema.subject -> subject.toJson,
+ "waitTime" -> 16.toJson,
+ defaultConfig.schema.activationId -> activationId.toJson,
+ defaultConfig.schema.result -> response.compactPrint.toJson,
+ defaultConfig.schema.version -> "0.0.1".toJson,
+ "cause" -> JsNull,
+ defaultConfig.schema.end -> end.toEpochMilli.toJson,
+ "kind" -> "nodejs:6".toJson,
+ "logs" -> logs.toJson,
+ defaultConfig.schema.start -> start.toEpochMilli.toJson,
+ "limits" -> JsObject("timeout" -> 60000.toJson, "memory" ->
256.toJson, "logs" -> 10.toJson),
+ "initTime" -> 44.toJson,
+ defaultConfig.schema.namespace -> namespace.toJson,
+ "@version" -> "1".toJson,
+ "type" -> defaultConfig.schema.activationRecord.toJson,
+ "ALCH_TENANT_ID" ->
"9cfe57a0-7ac1-4bf4-9026-d7e9e591271f".toJson // UUID
+ )),
+ JsObject(
+ "_index" -> "whisk_user_logs".toJson,
+ "_type" -> defaultConfig.schema.activationRecord.toJson,
+ "_id" -> "AWUQoCrVV6WHiq7A5LL8".toJson,
+ "_score" -> 3.74084.toJson,
+ "_source" -> JsObject(
+ defaultConfig.schema.statusCode -> 0.toJson,
+ defaultConfig.schema.duration -> 101.toJson,
+ defaultConfig.schema.name -> name.toJson,
+ defaultConfig.schema.subject -> subject.toJson,
+ "waitTime" -> 16.toJson,
+ defaultConfig.schema.activationId -> activationId.toJson,
+ defaultConfig.schema.result -> response.compactPrint.toJson,
+ defaultConfig.schema.version -> "0.0.1".toJson,
+ "cause" -> JsNull,
+ defaultConfig.schema.end -> end.toEpochMilli.toJson,
+ "kind" -> "nodejs:6".toJson,
+ "logs" -> logs.toJson,
+ defaultConfig.schema.start -> start.toEpochMilli.toJson,
+ "limits" -> JsObject("timeout" -> 60000.toJson, "memory" ->
256.toJson, "logs" -> 10.toJson),
+ "initTime" -> 44.toJson,
+ defaultConfig.schema.namespace -> namespace.toJson,
+ "@version" -> "1".toJson,
+ "type" -> defaultConfig.schema.activationRecord.toJson,
+ "ALCH_TENANT_ID" ->
"9cfe57a0-7ac1-4bf4-9026-d7e9e591271f".toJson // UUID
+ )))))
+
+ // Elasticsearch query requests
+ private val defaultPayload = JsObject(
+ "query" -> JsObject(
+ "query_string" -> JsObject("query" -> JsString(
+ s"_type: ${defaultConfig.schema.activationRecord} AND
${defaultConfig.schema.activationId}: $activationId"))),
+ "from" -> JsNumber(0)).compactPrint
+ private val defaultGetPayload = JsObject(
+ "query" -> JsObject(
+ "query_string" -> JsObject("query" -> JsString(
+ s"_type: ${defaultConfig.schema.activationRecord} AND
${defaultConfig.schema.activationId}: $activationId"))),
+ "from" -> JsNumber(0)).compactPrint
+ private val defaultCountPayload = JsObject(
+ "query" -> JsObject(
+ "bool" -> JsObject(
+ "must" -> JsArray(
+ JsObject("match" -> JsObject("_type" ->
JsString(defaultConfig.schema.activationRecord))),
+ JsObject("match" -> JsObject(defaultConfig.schema.name ->
JsString(name.name)))),
+ "filter" -> JsArray(
+ JsObject(
+ "range" -> JsObject(defaultConfig.schema.start -> JsObject("gt" ->
JsString(since.toEpochMilli.toString)))),
+ JsObject("range" -> JsObject(
+ defaultConfig.schema.start -> JsObject("lt" ->
JsString(upto.toEpochMilli.toString))))))),
+ "sort" -> JsArray(JsObject(defaultConfig.schema.start -> JsObject("order"
-> JsString("desc")))),
+ "from" -> JsNumber(1)).compactPrint
+ private val defaultListEntityPayload = JsObject(
+ "query" -> JsObject(
+ "bool" -> JsObject(
+ "must" -> JsArray(
+ JsObject("match" -> JsObject("_type" ->
JsString(defaultConfig.schema.activationRecord))),
+ JsObject("match" -> JsObject(defaultConfig.schema.name ->
JsString(name.name)))),
+ "filter" -> JsArray(
+ JsObject(
+ "range" -> JsObject(defaultConfig.schema.start -> JsObject("gt" ->
JsString(since.toEpochMilli.toString)))),
+ JsObject("range" -> JsObject(
+ defaultConfig.schema.start -> JsObject("lt" ->
JsString(upto.toEpochMilli.toString))))))),
+ "sort" -> JsArray(JsObject(defaultConfig.schema.start -> JsObject("order"
-> JsString("desc")))),
+ "size" -> JsNumber(2),
+ "from" -> JsNumber(1)).compactPrint
+ private val defaultListPayload = JsObject(
+ "query" -> JsObject(
+ "bool" -> JsObject(
+ "must" -> JsArray(
+ JsObject("match" -> JsObject("_type" ->
JsString(defaultConfig.schema.activationRecord))),
+ JsObject("match" -> JsObject(defaultConfig.schema.subject ->
JsString(user.namespace.name.asString)))),
+ "filter" -> JsArray(
+ JsObject(
+ "range" -> JsObject(defaultConfig.schema.start -> JsObject("gt" ->
JsString(since.toEpochMilli.toString)))),
+ JsObject("range" -> JsObject(
+ defaultConfig.schema.start -> JsObject("lt" ->
JsString(upto.toEpochMilli.toString))))))),
+ "sort" -> JsArray(JsObject(defaultConfig.schema.start -> JsObject("order"
-> JsString("desc")))),
+ "size" -> JsNumber(2),
+ "from" -> JsNumber(1)).compactPrint
+
+ // Elasticsearch HTTP responses
+ private val defaultHttpResponse = HttpResponse(
+ StatusCodes.OK,
+ entity = HttpEntity(ContentTypes.`application/json`,
defaultQueryResponse.compactPrint))
+ private val emptyHttpResponse = HttpResponse(
+ StatusCodes.OK,
+ entity = HttpEntity(
+ ContentTypes.`application/json`,
+
s"""{"took":2,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":0,"max_score":null,"hits":[]}}"""))
+
+ // Elasticsearch HTTP requests
+ private val defaultHttpRequest = HttpRequest(
+ POST,
+ Uri(s"/whisk_user_logs/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultPayload))
+ private val defaultLogStoreHttpRequest =
+ HttpRequest(method = GET, uri = "https://some.url", entity =
HttpEntity.Empty)
+
+ private def testFlow(httpResponse: HttpResponse = HttpResponse(),
httpRequest: HttpRequest = HttpRequest())
+ : Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse],
Promise[HttpResponse]), NotUsed] =
+ Flow[(HttpRequest, Promise[HttpResponse])]
+ .mapAsyncUnordered(1) {
+ case (request, userContext) =>
+ //println(request)
+ //println(httpRequest)
+ request shouldBe httpRequest
+ Future.successful((Success(httpResponse), userContext))
+ }
+
+ private def await[T](awaitable: Future[T], timeout: FiniteDuration =
10.seconds) = Await.result(awaitable, timeout)
+
+ behavior of "ElasticSearch Activation Store"
+
+ it should "fail to connect to invalid host" in {
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(system, materializer, logging,
elasticSearchConfig = defaultConfig)
+
+ a[Throwable] should be thrownBy await(
+ esActivationStore.get(activation.activationId, UserContext(user,
defaultLogStoreHttpRequest)))
+ }
+
+ it should "get an activation" in {
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/whisk_user_logs/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultGetPayload))
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ Some(testFlow(defaultHttpResponse, httpRequest)),
+ elasticSearchConfig = defaultConfig)
+
+ await(esActivationStore.get(activationId, UserContext(user,
defaultLogStoreHttpRequest))) shouldBe activation
+ }
+
+ it should "get an activation with error response" in {
+ val errorMessage = "message".toJson
+ val errorResponse = JsObject("error" -> errorMessage)
+ val activationResponses = Seq(
+ (1, ActivationResponse.applicationError(errorMessage)),
+ (2, ActivationResponse.containerError(errorMessage)),
+ (3, ActivationResponse.whiskError(errorMessage)))
+
+ activationResponses.foreach {
+ case (statusCode, activationResponse) =>
+ val content = JsObject(
+ "took" -> 4.toJson,
+ "timed_out" -> false.toJson,
+ "_shards" -> JsObject("total" -> 5.toJson, "successful" -> 5.toJson,
"failed" -> 0.toJson),
+ "hits" -> JsObject(
+ "total" -> 1.toJson,
+ "max_score" -> 3.74084.toJson,
+ "hits" -> JsArray(JsObject(
+ "_index" -> "whisk_user_logs".toJson,
+ "_type" -> defaultConfig.schema.activationRecord.toJson,
+ "_id" -> "AWUQoCrVV6WHiq7A5LL8".toJson,
+ "_score" -> 3.74084.toJson,
+ "_source" -> JsObject(
+ defaultConfig.schema.statusCode -> statusCode.toJson,
+ defaultConfig.schema.duration -> 101.toJson,
+ defaultConfig.schema.name -> name.toJson,
+ defaultConfig.schema.subject -> subject.toJson,
+ "waitTime" -> 16.toJson,
+ defaultConfig.schema.activationId -> activationId.toJson,
+ defaultConfig.schema.result ->
errorResponse.compactPrint.toJson,
+ defaultConfig.schema.version -> "0.0.1".toJson,
+ "cause" -> JsNull,
+ defaultConfig.schema.end -> end.toEpochMilli.toJson,
+ "kind" -> "nodejs:6".toJson,
+ "logs" -> logs.toJson,
+ defaultConfig.schema.start -> start.toEpochMilli.toJson,
+ "limits" -> JsObject("timeout" -> 60000.toJson, "memory" ->
256.toJson, "logs" -> 10.toJson),
+ "initTime" -> 44.toJson,
+ defaultConfig.schema.namespace -> namespace.toJson,
+ "@version" -> "1".toJson,
+ "type" -> defaultConfig.schema.activationRecord.toJson,
+ "ALCH_TENANT_ID" ->
"9cfe57a0-7ac1-4bf4-9026-d7e9e591271f".toJson // UUID
+ )))))
+ val activationWithError = WhiskActivation(
+ namespace = namespace,
+ name = name,
+ subject,
+ activationId = activationId,
+ start = start,
+ end = end,
+ response = activationResponse,
+ logs = expectedLogs,
+ duration = Some(101L),
+ annotations = Parameters("kind", "nodejs:6") ++ Parameters(
+ "limits",
+ ActionLimits(TimeLimit(60.second), MemoryLimit(256.MB),
LogLimit(10.MB)).toJson) ++
+ Parameters("waitTime", 16.toJson) ++
+ Parameters("initTime", 44.toJson))
+ val defaultHttpErrorResponse =
+ HttpResponse(StatusCodes.OK, entity =
HttpEntity(ContentTypes.`application/json`, content.compactPrint))
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/whisk_user_logs/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultGetPayload))
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ Some(testFlow(defaultHttpErrorResponse, httpRequest)),
+ elasticSearchConfig = defaultConfig)
+
+ await(esActivationStore.get(activationId, UserContext(user,
defaultLogStoreHttpRequest))) shouldBe activationWithError
+ }
+ }
+
+ it should "error when getting an activation that does not exist" in {
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/whisk_user_logs/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultGetPayload))
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ Some(testFlow(emptyHttpResponse, httpRequest)),
+ elasticSearchConfig = defaultConfig)
+
+ a[NoDocumentException] should be thrownBy await(
+ esActivationStore.get(activationId, UserContext(user,
defaultLogStoreHttpRequest)))
+ }
+
+ it should "dynamically replace $UUID when getting an activation" in {
+ val dynamicPathConfig =
+ ElasticSearchActivationStoreConfig("https", "host", 443,
"/elasticsearch/logstash-%s*/_search", defaultSchema)
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/elasticsearch/logstash-${user.namespace.uuid.asString}*/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultGetPayload))
+
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ Some(testFlow(defaultHttpResponse, httpRequest)),
+ elasticSearchConfig = dynamicPathConfig)
+
+ await(esActivationStore.get(activation.activationId, UserContext(user,
defaultLogStoreHttpRequest))) shouldBe activation
+ }
+
+ it should "count activations in namespace" in {
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/whisk_user_logs/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultCountPayload))
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ Some(testFlow(defaultHttpResponse, httpRequest)),
+ elasticSearchConfig = defaultConfig)
+
+ await(
+ esActivationStore.countActivationsInNamespace(
+ user.namespace.name.toPath,
+ Some(name.toPath),
+ 1,
+ since = Some(since),
+ upto = Some(upto),
+ UserContext(user, defaultLogStoreHttpRequest))) shouldBe
JsObject("activations" -> JsNumber(1))
+ }
+
+ it should "count activations in namespace with no entity name" in {
+ val payload = JsObject(
+ "query" -> JsObject(
+ "bool" -> JsObject(
+ "must" -> JsArray(JsObject("match" -> JsObject("_type" ->
JsString(defaultConfig.schema.activationRecord)))),
+ "filter" -> JsArray(
+ JsObject("range" -> JsObject(
+ defaultConfig.schema.start -> JsObject("gt" ->
JsString(since.toEpochMilli.toString)))),
+ JsObject("range" -> JsObject(
+ defaultConfig.schema.start -> JsObject("lt" ->
JsString(upto.toEpochMilli.toString))))))),
+ "sort" -> JsArray(JsObject(defaultConfig.schema.start ->
JsObject("order" -> JsString("desc")))),
+ "from" -> JsNumber(1)).compactPrint
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/whisk_user_logs/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, payload))
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ Some(testFlow(defaultHttpResponse, httpRequest)),
+ elasticSearchConfig = defaultConfig)
+
+ await(
+ esActivationStore.countActivationsInNamespace(
+ user.namespace.name.toPath,
+ skip = 1,
+ since = Some(since),
+ upto = Some(upto),
+ context = UserContext(user, defaultLogStoreHttpRequest))) shouldBe
JsObject("activations" -> JsNumber(1))
+ }
+
+ it should "count zero activations in when there are not any activations that
match entity" in {
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/whisk_user_logs/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultCountPayload))
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ Some(testFlow(emptyHttpResponse, httpRequest)),
+ elasticSearchConfig = defaultConfig)
+
+ await(
+ esActivationStore.countActivationsInNamespace(
+ user.namespace.name.toPath,
+ Some(name.toPath),
+ 1,
+ since = Some(since),
+ upto = Some(upto),
+ UserContext(user, defaultLogStoreHttpRequest))) shouldBe
JsObject("activations" -> JsNumber(0))
+ }
+
+ it should "dynamically replace $UUID in request when counting activations"
in {
+ val dynamicPathConfig =
+ ElasticSearchActivationStoreConfig("https", "host", 443,
"/elasticsearch/logstash-%s*/_search", defaultSchema)
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/elasticsearch/logstash-${user.namespace.uuid.asString}*/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultCountPayload))
+
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ Some(testFlow(defaultHttpResponse, httpRequest)),
+ elasticSearchConfig = dynamicPathConfig)
+
+ await(
+ esActivationStore.countActivationsInNamespace(
+ user.namespace.name.toPath,
+ Some(name.toPath),
+ 1,
+ since = Some(since),
+ upto = Some(upto),
+ UserContext(user, defaultLogStoreHttpRequest))) shouldBe
JsObject("activations" -> JsNumber(1))
+ }
+
+ it should "list activations matching entity name" in {
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/whisk_user_logs/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultListEntityPayload))
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ Some(testFlow(defaultHttpResponse, httpRequest)),
+ elasticSearchConfig = defaultConfig)
+
+ await(
+ esActivationStore.listActivationsMatchingName(
+ user.namespace.name.toPath,
+ name.toPath,
+ 1,
+ 2,
+ since = Some(since),
+ upto = Some(upto),
+ context = UserContext(user, defaultLogStoreHttpRequest))) shouldBe
Right(List(activation, activation))
+ }
+
+ it should "display empty activations list when there are not any activations
that match entity name" in {
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/whisk_user_logs/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultListEntityPayload))
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ Some(testFlow(emptyHttpResponse, httpRequest)),
+ elasticSearchConfig = defaultConfig)
+
+ await(
+ esActivationStore.listActivationsMatchingName(
+ user.namespace.name.toPath,
+ name.toPath,
+ 1,
+ 2,
+ since = Some(since),
+ upto = Some(upto),
+ context = UserContext(user, defaultLogStoreHttpRequest))) shouldBe
Right(List.empty)
+ }
+
+ it should "dynamically replace $UUID in request when getting activations
matching entity name" in {
+ val dynamicPathConfig =
+ ElasticSearchActivationStoreConfig("https", "host", 443,
"/elasticsearch/logstash-%s*/_search", defaultSchema)
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/elasticsearch/logstash-${user.namespace.uuid.asString}*/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultListEntityPayload))
+
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ Some(testFlow(defaultHttpResponse, httpRequest)),
+ elasticSearchConfig = dynamicPathConfig)
+
+ await(
+ esActivationStore.listActivationsMatchingName(
+ user.namespace.name.toPath,
+ name.toPath,
+ 1,
+ 2,
+ since = Some(since),
+ upto = Some(upto),
+ context = UserContext(user, defaultLogStoreHttpRequest))) shouldBe
Right(List(activation, activation))
+ }
+
+ it should "list activations in namespace" in {
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/whisk_user_logs/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultListPayload))
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ Some(testFlow(defaultHttpResponse, httpRequest)),
+ elasticSearchConfig = defaultConfig)
+
+ await(
+ esActivationStore.listActivationsInNamespace(
+ user.namespace.name.toPath,
+ 1,
+ 2,
+ since = Some(since),
+ upto = Some(upto),
+ context = UserContext(user, defaultLogStoreHttpRequest))) shouldBe
Right(List(activation, activation))
+ }
+
+ it should "display empty activations list when there are not any activations
in namespace" in {
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/whisk_user_logs/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultListPayload))
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ Some(testFlow(emptyHttpResponse, httpRequest)),
+ elasticSearchConfig = defaultConfig)
+
+ await(
+ esActivationStore.listActivationsInNamespace(
+ user.namespace.name.toPath,
+ 1,
+ 2,
+ since = Some(since),
+ upto = Some(upto),
+ context = UserContext(user, defaultLogStoreHttpRequest))) shouldBe
Right(List.empty)
+ }
+
+ it should "dynamically replace $UUID in request when listing activations in
namespace" in {
+ val dynamicPathConfig =
+ ElasticSearchActivationStoreConfig("https", "host", 443,
"/elasticsearch/logstash-%s*/_search", defaultSchema)
+ val httpRequest = HttpRequest(
+ POST,
+ Uri(s"/elasticsearch/logstash-${user.namespace.uuid.asString}*/_search"),
+ List(Accept(MediaTypes.`application/json`)),
+ HttpEntity(ContentTypes.`application/json`, defaultListPayload))
+
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ Some(testFlow(defaultHttpResponse, httpRequest)),
+ elasticSearchConfig = dynamicPathConfig)
+
+ await(
+ esActivationStore.listActivationsInNamespace(
+ user.namespace.name.toPath,
+ 1,
+ 2,
+ since = Some(since),
+ upto = Some(upto),
+ context = UserContext(user, defaultLogStoreHttpRequest))) shouldBe
Right(List(activation, activation))
+ }
+
+ it should "forward errors from Elasticsearch" in {
+ val httpResponse = HttpResponse(StatusCodes.InternalServerError)
+ val esActivationStore =
+ new ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ Some(testFlow(httpResponse, defaultHttpRequest)),
+ elasticSearchConfig = defaultConfig)
+
+ a[RuntimeException] should be thrownBy await(
+ esActivationStore.get(activation.activationId, UserContext(user,
defaultLogStoreHttpRequest)))
+ a[RuntimeException] should be thrownBy await(
+ esActivationStore
+ .listActivationsInNamespace(EntityPath(""), 0, 0, context =
UserContext(user, defaultLogStoreHttpRequest)))
+ a[RuntimeException] should be thrownBy await(
+ esActivationStore.listActivationsMatchingName(
+ EntityPath(""),
+ EntityPath(""),
+ 0,
+ 0,
+ context = UserContext(user, defaultLogStoreHttpRequest)))
+ a[RuntimeException] should be thrownBy await(
+ esActivationStore
+ .countActivationsInNamespace(EntityPath(""), None, 0, context =
UserContext(user, defaultLogStoreHttpRequest)))
+ }
+
+ it should "fail when loading out of box configs since
whisk.activationstore.elasticsearch does not exist" in {
+ a[ConfigReaderException[_]] should be thrownBy new
ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging)
+ }
+
+ it should "error when configuration protocol is invalid" in {
+ val invalidHostConfig =
+ ElasticSearchActivationStoreConfig("protocol", "host", 443,
"/whisk_user_logs", defaultSchema, Seq.empty)
+
+ a[IllegalArgumentException] should be thrownBy new
ArtifactElasticSearchActivationStore(
+ system,
+ materializer,
+ logging,
+ elasticSearchConfig = invalidHostConfig)
+ }
+
+}
With regards,
Apache Git Services