This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 36acc1d  Pass additional data to LogStore's fetchLogs method. (#3330)
36acc1d is described below

commit 36acc1df860423dcf4d2db0e3babf831209e6d69
Author: James Dubee <[email protected]>
AuthorDate: Fri Feb 23 14:42:47 2018 -0500

    Pass additional data to LogStore's fetchLogs method. (#3330)
---
 .../logging/DockerToActivationFileLogStore.scala   |  7 ++---
 .../logging/DockerToActivationLogStore.scala       |  8 ++++--
 .../containerpool/logging/LogDriverLogStore.scala  |  6 +++--
 .../core/containerpool/logging/LogStore.scala      |  4 ++-
 .../containerpool/logging/SplunkLogStore.scala     |  8 +++++-
 .../scala/whisk/core/controller/Activations.scala  | 22 +++++++++-------
 .../logging/SplunkLogStoreTests.scala              | 30 +++++++++++++++++-----
 7 files changed, 61 insertions(+), 24 deletions(-)

diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
index f9ec413..4be36a7 100644
--- 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
@@ -18,6 +18,7 @@
 package whisk.core.containerpool.logging
 
 import java.nio.file.{Path, Paths}
+import java.time.Instant
 
 import akka.NotUsed
 import akka.actor.ActorSystem
@@ -25,15 +26,15 @@ import akka.stream.alpakka.file.scaladsl.LogRotatorSink
 import akka.stream.{Graph, SinkShape, UniformFanOutShape}
 import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, Sink, 
Source}
 import akka.util.ByteString
+
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
 import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, 
WhiskActivation}
 import whisk.core.entity.size._
+import whisk.http.Messages
+
 import spray.json._
 import spray.json.DefaultJsonProtocol._
-import java.time.Instant
-
-import whisk.http.Messages
 
 import scala.concurrent.Future
 
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
index 153aa59..b4e3983 100644
--- 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala
@@ -19,18 +19,21 @@ package whisk.core.containerpool.logging
 
 import akka.NotUsed
 import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpRequest
 import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.Sink
 import akka.stream.scaladsl.Flow
 import akka.util.ByteString
+
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
 import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, 
WhiskActivation}
-import spray.json._
 import whisk.http.Messages
 
 import scala.concurrent.{ExecutionContext, Future}
 
+import spray.json._
+
 /**
  * Represents a single log line as read from a docker log
  */
@@ -64,7 +67,8 @@ class DockerToActivationLogStore(system: ActorSystem) extends 
LogStore {
   override val containerParameters = Map("--log-driver" -> Set("json-file"))
 
   /* As logs are already part of the activation record, just return that bit 
of it */
-  override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] 
= Future.successful(activation.logs)
+  override def fetchLogs(user: Identity, activation: WhiskActivation, request: 
HttpRequest): Future[ActivationLogs] =
+    Future.successful(activation.logs)
 
   override def collectLogs(transid: TransactionId,
                            user: Identity,
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
index 465fb25..ea1576c 100644
--- 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
@@ -18,10 +18,12 @@
 package whisk.core.containerpool.logging
 
 import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpRequest
+
 import whisk.core.entity.Identity
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
-import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, 
WhiskActivation}
+import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, 
WhiskActivation}
 
 import scala.concurrent.Future
 
@@ -47,7 +49,7 @@ class LogDriverLogStore(actorSystem: ActorSystem) extends 
LogStore {
 
   /** no logs exposed to API/CLI using only the LogDriverLogStore; use an 
extended version,
    * e.g. the SplunkLogStore to expose logs from some external source */
-  def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] =
+  def fetchLogs(user: Identity, activation: WhiskActivation, request: 
HttpRequest): Future[ActivationLogs] =
     Future.successful(ActivationLogs(Vector("Logs are not available.")))
 }
 
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
index 335eed5..28c5b9e 100644
--- 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala
@@ -18,6 +18,8 @@
 package whisk.core.containerpool.logging
 
 import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpRequest
+
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
 import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, 
WhiskActivation}
@@ -74,7 +76,7 @@ trait LogStore {
    * @param activation activation to fetch the logs for
    * @return the relevant logs
    */
-  def fetchLogs(activation: WhiskActivation): Future[ActivationLogs]
+  def fetchLogs(user: Identity, activation: WhiskActivation, request: 
HttpRequest): Future[ActivationLogs]
 }
 
 trait LogStoreProvider extends Spi {
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
index 596b776..694fc9a 100644
--- 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
@@ -36,18 +36,24 @@ import akka.stream.scaladsl.Flow
 import akka.stream.scaladsl.Keep
 import akka.stream.scaladsl.Sink
 import akka.stream.scaladsl.Source
+
 import com.typesafe.sslconfig.akka.AkkaSSLConfig
+
 import pureconfig._
+
 import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
+
 import spray.json._
+
 import whisk.common.AkkaLogging
 import whisk.core.ConfigKeys
 import whisk.core.entity.ActivationLogs
 import whisk.core.entity.WhiskActivation
+import whisk.core.entity.Identity
 
 case class SplunkLogStoreConfig(host: String,
                                 port: Int,
@@ -92,7 +98,7 @@ class SplunkLogStore(
         Http().createClientHttpsContext(AkkaSSLConfig().mapSettings(s => 
s.withLoose(s.loose.withDisableSNI(true))))
       else Http().defaultClientHttpsContext)
 
-  override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] 
= {
+  override def fetchLogs(user: Identity, activation: WhiskActivation, request: 
HttpRequest): Future[ActivationLogs] = {
 
     //example curl request:
     //    curl -u  username:password -k 
https://splunkhost:port/services/search/jobs -d exec_mode=oneshot -d 
output_mode=json -d "search=search index=\"someindex\" | spath=activation_id | 
search activation_id=a930e5ae4ad4455c8f2505d665aad282 |  table log_message" -d 
"earliest_time=2017-08-29T12:00:00" -d "latest_time=2017-10-29T12:00:00"
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/Activations.scala 
b/core/controller/src/main/scala/whisk/core/controller/Activations.scala
index f7ea7b8..6cc4ef7 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Activations.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Activations.scala
@@ -26,6 +26,7 @@ import 
akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarsha
 import akka.http.scaladsl.model.StatusCodes.BadRequest
 import akka.http.scaladsl.server.Directives
 import akka.http.scaladsl.unmarshalling._
+
 import spray.json._
 import spray.json.DefaultJsonProtocol.RootJsObjectFormat
 import whisk.common.TransactionId
@@ -120,7 +121,7 @@ trait WhiskActivationsApi extends Directives with 
AuthenticatedRouteProvider wit
     resource.entity match {
       case Some(ActivationId(id)) =>
         op match {
-          case READ => fetch(resource.namespace, id)
+          case READ => fetch(user, resource.namespace, id)
           case _    => reject // should not get here
         }
       case None =>
@@ -201,7 +202,8 @@ trait WhiskActivationsApi extends Directives with 
AuthenticatedRouteProvider wit
    * - 404 Not Found
    * - 500 Internal Server Error
    */
-  private def fetch(namespace: EntityPath, activationId: 
ActivationId)(implicit transid: TransactionId) = {
+  private def fetch(user: Identity, namespace: EntityPath, activationId: 
ActivationId)(
+    implicit transid: TransactionId) = {
     val docid = DocId(WhiskEntity.qualifiedName(namespace, activationId))
     pathEndOrSingleSlash {
       getEntity(
@@ -211,7 +213,7 @@ trait WhiskActivationsApi extends Directives with 
AuthenticatedRouteProvider wit
         postProcess = Some((activation: WhiskActivation) => 
complete(activation.toExtendedJson)))
 
     } ~ (pathPrefix(resultPath) & pathEnd) { fetchResponse(docid) } ~
-      (pathPrefix(logsPath) & pathEnd) { fetchLogs(docid) }
+      (pathPrefix(logsPath) & pathEnd) { fetchLogs(user, docid) }
   }
 
   /**
@@ -238,11 +240,13 @@ trait WhiskActivationsApi extends Directives with 
AuthenticatedRouteProvider wit
    * - 404 Not Found
    * - 500 Internal Server Error
    */
-  private def fetchLogs(docid: DocId)(implicit transid: TransactionId) = {
-    getEntityAndProject(
-      WhiskActivation,
-      activationStore,
-      docid,
-      (activation: WhiskActivation) => 
logStore.fetchLogs(activation).map(_.toJsonObject))
+  private def fetchLogs(user: Identity, docid: DocId)(implicit transid: 
TransactionId) = {
+    extractRequest { request =>
+      getEntityAndProject(
+        WhiskActivation,
+        activationStore,
+        docid,
+        (activation: WhiskActivation) => logStore.fetchLogs(user, activation, 
request).map(_.toJsonObject))
+    }
   }
 }
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
index f25a49c..5c12579 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
@@ -26,30 +26,39 @@ import akka.http.scaladsl.model.HttpEntity
 import akka.http.scaladsl.model.HttpRequest
 import akka.http.scaladsl.model.HttpResponse
 import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model.HttpMethods.POST
+import akka.http.scaladsl.model.headers.RawHeader
+import akka.http.scaladsl.model.MediaTypes
 import akka.http.scaladsl.unmarshalling.Unmarshal
 import akka.stream.ActorMaterializer
 import akka.stream.StreamTcpException
 import akka.stream.scaladsl.Flow
 import akka.testkit.TestKit
+
 import common.StreamLogging
+
 import java.time.ZonedDateTime
+
+import pureconfig.error.ConfigReaderException
+
 import org.junit.runner.RunWith
 import org.scalatest.Matchers
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.junit.JUnitRunner
-import scala.util.Failure
-import whisk.core.entity.ActivationLogs
 import org.scalatest.FlatSpecLike
-import pureconfig.error.ConfigReaderException
+
 import scala.concurrent.Await
 import scala.concurrent.Promise
 import scala.concurrent.duration._
 import scala.util.Success
 import scala.util.Try
+import scala.util.Failure
+
 import spray.json.JsNumber
 import spray.json.JsObject
 import spray.json._
+
 import whisk.core.entity.ActionLimits
 import whisk.core.entity.ActivationId
 import whisk.core.entity.ActivationResponse
@@ -62,6 +71,9 @@ import whisk.core.entity.Subject
 import whisk.core.entity.TimeLimit
 import whisk.core.entity.WhiskActivation
 import whisk.core.entity.size._
+import whisk.core.entity.AuthKey
+import whisk.core.entity.Identity
+import whisk.core.entity.ActivationLogs
 
 @RunWith(classOf[JUnitRunner])
 class SplunkLogStoreTests
@@ -85,6 +97,12 @@ class SplunkLogStoreTests
   val startTime = "2007-12-03T10:15:30Z"
   val endTime = "2007-12-03T10:15:45Z"
   val endTimePlus5 = "2007-12-03T10:15:50Z" //queried end time range is 
endTime+5
+  val user = Identity(Subject(), EntityName("testSpace"), AuthKey(), Set())
+  val request = HttpRequest(
+    method = POST,
+    uri = "https://some.url";,
+    headers = List(RawHeader("key", "value")),
+    entity = HttpEntity(MediaTypes.`application/json`, 
JsObject().compactPrint))
 
   val activation = WhiskActivation(
     namespace = EntityPath("ns"),
@@ -155,14 +173,14 @@ class SplunkLogStoreTests
   it should "find logs based on activation timestamps" in {
     //use the a flow that asserts the request structure and provides a 
response in the expected format
     val splunkStore = new SplunkLogStore(system, Some(testFlow), testConfig)
-    val result = Await.result(splunkStore.fetchLogs(activation), 1.second)
+    val result = Await.result(splunkStore.fetchLogs(user, activation, 
request), 1.second)
     result shouldBe ActivationLogs(Vector("some log message", "some other log 
message"))
   }
 
   it should "fail to connect to bogus host" in {
     //use the default http flow with the default bogus-host config
     val splunkStore = new SplunkLogStore(system, splunkConfig = testConfig)
-    val result = splunkStore.fetchLogs(activation)
+    val result = splunkStore.fetchLogs(user, activation, request)
     whenReady(result.failed, Timeout(1.second)) { ex =>
       ex shouldBe an[StreamTcpException]
     }
@@ -170,7 +188,7 @@ class SplunkLogStoreTests
   it should "display an error if API cannot be reached" in {
     //use a flow that generates a 500 response
     val splunkStore = new SplunkLogStore(system, Some(failFlow), testConfig)
-    val result = splunkStore.fetchLogs(activation)
+    val result = splunkStore.fetchLogs(user, activation, request)
     whenReady(result.failed, Timeout(1.second)) { ex =>
       ex shouldBe an[RuntimeException]
     }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to