This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 36acc1d Pass additional data to LogStore's fetchLogs method. (#3330)
36acc1d is described below
commit 36acc1df860423dcf4d2db0e3babf831209e6d69
Author: James Dubee <[email protected]>
AuthorDate: Fri Feb 23 14:42:47 2018 -0500
Pass additional data to LogStore's fetchLogs method. (#3330)
---
.../logging/DockerToActivationFileLogStore.scala | 7 ++---
.../logging/DockerToActivationLogStore.scala | 8 ++++--
.../containerpool/logging/LogDriverLogStore.scala | 6 +++--
.../core/containerpool/logging/LogStore.scala | 4 ++-
.../containerpool/logging/SplunkLogStore.scala | 8 +++++-
.../scala/whisk/core/controller/Activations.scala | 22 +++++++++-------
.../logging/SplunkLogStoreTests.scala | 30 +++++++++++++++++-----
7 files changed, 61 insertions(+), 24 deletions(-)
diff --git
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
index f9ec413..4be36a7 100644
---
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
+++
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
@@ -18,6 +18,7 @@
package whisk.core.containerpool.logging
import java.nio.file.{Path, Paths}
+import java.time.Instant
import akka.NotUsed
import akka.actor.ActorSystem
@@ -25,15 +26,15 @@ import akka.stream.alpakka.file.scaladsl.LogRotatorSink
import akka.stream.{Graph, SinkShape, UniformFanOutShape}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, Sink,
Source}
import akka.util.ByteString
+
import whisk.common.TransactionId
import whisk.core.containerpool.Container
import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity,
WhiskActivation}
import whisk.core.entity.size._
+import whisk.http.Messages
+
import spray.json._
import spray.json.DefaultJsonProtocol._
-import java.time.Instant
-
-import whisk.http.Messages
import scala.concurrent.Future
diff --git
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
index 153aa59..b4e3983 100644
---
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
+++
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
@@ -19,18 +19,21 @@ package whisk.core.containerpool.logging
import akka.NotUsed
import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpRequest
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Flow
import akka.util.ByteString
+
import whisk.common.TransactionId
import whisk.core.containerpool.Container
import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity,
WhiskActivation}
-import spray.json._
import whisk.http.Messages
import scala.concurrent.{ExecutionContext, Future}
+import spray.json._
+
/**
* Represents a single log line as read from a docker log
*/
@@ -64,7 +67,8 @@ class DockerToActivationLogStore(system: ActorSystem) extends
LogStore {
override val containerParameters = Map("--log-driver" -> Set("json-file"))
/* As logs are already part of the activation record, just return that bit
of it */
- override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs]
= Future.successful(activation.logs)
+ override def fetchLogs(user: Identity, activation: WhiskActivation, request:
HttpRequest): Future[ActivationLogs] =
+ Future.successful(activation.logs)
override def collectLogs(transid: TransactionId,
user: Identity,
diff --git
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
index 465fb25..ea1576c 100644
---
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
+++
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
@@ -18,10 +18,12 @@
package whisk.core.containerpool.logging
import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpRequest
+
import whisk.core.entity.Identity
import whisk.common.TransactionId
import whisk.core.containerpool.Container
-import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction,
WhiskActivation}
+import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity,
WhiskActivation}
import scala.concurrent.Future
@@ -47,7 +49,7 @@ class LogDriverLogStore(actorSystem: ActorSystem) extends
LogStore {
/** no logs exposed to API/CLI using only the LogDriverLogStore; use an
extended version,
* e.g. the SplunkLogStore to expose logs from some external source */
- def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] =
+ def fetchLogs(user: Identity, activation: WhiskActivation, request:
HttpRequest): Future[ActivationLogs] =
Future.successful(ActivationLogs(Vector("Logs are not available.")))
}
diff --git
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
index 335eed5..28c5b9e 100644
---
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
+++
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
@@ -18,6 +18,8 @@
package whisk.core.containerpool.logging
import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpRequest
+
import whisk.common.TransactionId
import whisk.core.containerpool.Container
import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity,
WhiskActivation}
@@ -74,7 +76,7 @@ trait LogStore {
* @param activation activation to fetch the logs for
* @return the relevant logs
*/
- def fetchLogs(activation: WhiskActivation): Future[ActivationLogs]
+ def fetchLogs(user: Identity, activation: WhiskActivation, request:
HttpRequest): Future[ActivationLogs]
}
trait LogStoreProvider extends Spi {
diff --git
a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
index 596b776..694fc9a 100644
---
a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
+++
b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
@@ -36,18 +36,24 @@ import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
+
import com.typesafe.sslconfig.akka.AkkaSSLConfig
+
import pureconfig._
+
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.Failure
import scala.util.Success
import scala.util.Try
+
import spray.json._
+
import whisk.common.AkkaLogging
import whisk.core.ConfigKeys
import whisk.core.entity.ActivationLogs
import whisk.core.entity.WhiskActivation
+import whisk.core.entity.Identity
case class SplunkLogStoreConfig(host: String,
port: Int,
@@ -92,7 +98,7 @@ class SplunkLogStore(
Http().createClientHttpsContext(AkkaSSLConfig().mapSettings(s =>
s.withLoose(s.loose.withDisableSNI(true))))
else Http().defaultClientHttpsContext)
- override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs]
= {
+ override def fetchLogs(user: Identity, activation: WhiskActivation, request:
HttpRequest): Future[ActivationLogs] = {
//example curl request:
// curl -u username:password -k
https://splunkhost:port/services/search/jobs -d exec_mode=oneshot -d
output_mode=json -d "search=search index=\"someindex\" | spath=activation_id |
search activation_id=a930e5ae4ad4455c8f2505d665aad282 | table log_message" -d
"earliest_time=2017-08-29T12:00:00" -d "latest_time=2017-10-29T12:00:00"
diff --git
a/core/controller/src/main/scala/whisk/core/controller/Activations.scala
b/core/controller/src/main/scala/whisk/core/controller/Activations.scala
index f7ea7b8..6cc4ef7 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Activations.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Activations.scala
@@ -26,6 +26,7 @@ import
akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarsha
import akka.http.scaladsl.model.StatusCodes.BadRequest
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.unmarshalling._
+
import spray.json._
import spray.json.DefaultJsonProtocol.RootJsObjectFormat
import whisk.common.TransactionId
@@ -120,7 +121,7 @@ trait WhiskActivationsApi extends Directives with
AuthenticatedRouteProvider wit
resource.entity match {
case Some(ActivationId(id)) =>
op match {
- case READ => fetch(resource.namespace, id)
+ case READ => fetch(user, resource.namespace, id)
case _ => reject // should not get here
}
case None =>
@@ -201,7 +202,8 @@ trait WhiskActivationsApi extends Directives with
AuthenticatedRouteProvider wit
* - 404 Not Found
* - 500 Internal Server Error
*/
- private def fetch(namespace: EntityPath, activationId:
ActivationId)(implicit transid: TransactionId) = {
+ private def fetch(user: Identity, namespace: EntityPath, activationId:
ActivationId)(
+ implicit transid: TransactionId) = {
val docid = DocId(WhiskEntity.qualifiedName(namespace, activationId))
pathEndOrSingleSlash {
getEntity(
@@ -211,7 +213,7 @@ trait WhiskActivationsApi extends Directives with
AuthenticatedRouteProvider wit
postProcess = Some((activation: WhiskActivation) =>
complete(activation.toExtendedJson)))
} ~ (pathPrefix(resultPath) & pathEnd) { fetchResponse(docid) } ~
- (pathPrefix(logsPath) & pathEnd) { fetchLogs(docid) }
+ (pathPrefix(logsPath) & pathEnd) { fetchLogs(user, docid) }
}
/**
@@ -238,11 +240,13 @@ trait WhiskActivationsApi extends Directives with
AuthenticatedRouteProvider wit
* - 404 Not Found
* - 500 Internal Server Error
*/
- private def fetchLogs(docid: DocId)(implicit transid: TransactionId) = {
- getEntityAndProject(
- WhiskActivation,
- activationStore,
- docid,
- (activation: WhiskActivation) =>
logStore.fetchLogs(activation).map(_.toJsonObject))
+ private def fetchLogs(user: Identity, docid: DocId)(implicit transid:
TransactionId) = {
+ extractRequest { request =>
+ getEntityAndProject(
+ WhiskActivation,
+ activationStore,
+ docid,
+ (activation: WhiskActivation) => logStore.fetchLogs(user, activation,
request).map(_.toJsonObject))
+ }
}
}
diff --git
a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
index f25a49c..5c12579 100644
---
a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
@@ -26,30 +26,39 @@ import akka.http.scaladsl.model.HttpEntity
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.HttpMethods.POST
+import akka.http.scaladsl.model.headers.RawHeader
+import akka.http.scaladsl.model.MediaTypes
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.StreamTcpException
import akka.stream.scaladsl.Flow
import akka.testkit.TestKit
+
import common.StreamLogging
+
import java.time.ZonedDateTime
+
+import pureconfig.error.ConfigReaderException
+
import org.junit.runner.RunWith
import org.scalatest.Matchers
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
-import scala.util.Failure
-import whisk.core.entity.ActivationLogs
import org.scalatest.FlatSpecLike
-import pureconfig.error.ConfigReaderException
+
import scala.concurrent.Await
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Success
import scala.util.Try
+import scala.util.Failure
+
import spray.json.JsNumber
import spray.json.JsObject
import spray.json._
+
import whisk.core.entity.ActionLimits
import whisk.core.entity.ActivationId
import whisk.core.entity.ActivationResponse
@@ -62,6 +71,9 @@ import whisk.core.entity.Subject
import whisk.core.entity.TimeLimit
import whisk.core.entity.WhiskActivation
import whisk.core.entity.size._
+import whisk.core.entity.AuthKey
+import whisk.core.entity.Identity
+import whisk.core.entity.ActivationLogs
@RunWith(classOf[JUnitRunner])
class SplunkLogStoreTests
@@ -85,6 +97,12 @@ class SplunkLogStoreTests
val startTime = "2007-12-03T10:15:30Z"
val endTime = "2007-12-03T10:15:45Z"
val endTimePlus5 = "2007-12-03T10:15:50Z" //queried end time range is
endTime+5
+ val user = Identity(Subject(), EntityName("testSpace"), AuthKey(), Set())
+ val request = HttpRequest(
+ method = POST,
+ uri = "https://some.url",
+ headers = List(RawHeader("key", "value")),
+ entity = HttpEntity(MediaTypes.`application/json`,
JsObject().compactPrint))
val activation = WhiskActivation(
namespace = EntityPath("ns"),
@@ -155,14 +173,14 @@ class SplunkLogStoreTests
it should "find logs based on activation timestamps" in {
//use the a flow that asserts the request structure and provides a
response in the expected format
val splunkStore = new SplunkLogStore(system, Some(testFlow), testConfig)
- val result = Await.result(splunkStore.fetchLogs(activation), 1.second)
+ val result = Await.result(splunkStore.fetchLogs(user, activation,
request), 1.second)
result shouldBe ActivationLogs(Vector("some log message", "some other log
message"))
}
it should "fail to connect to bogus host" in {
//use the default http flow with the default bogus-host config
val splunkStore = new SplunkLogStore(system, splunkConfig = testConfig)
- val result = splunkStore.fetchLogs(activation)
+ val result = splunkStore.fetchLogs(user, activation, request)
whenReady(result.failed, Timeout(1.second)) { ex =>
ex shouldBe an[StreamTcpException]
}
@@ -170,7 +188,7 @@ class SplunkLogStoreTests
it should "display an error if API cannot be reached" in {
//use a flow that generates a 500 response
val splunkStore = new SplunkLogStore(system, Some(failFlow), testConfig)
- val result = splunkStore.fetchLogs(activation)
+ val result = splunkStore.fetchLogs(user, activation, request)
whenReady(result.failed, Timeout(1.second)) { ex =>
ex shouldBe an[RuntimeException]
}
--
To stop receiving notification emails like this one, please contact
[email protected].