markusthoemmes closed pull request #3330: Update Log Store Fetch URL: https://github.com/apache/incubator-openwhisk/pull/3330
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala index f9ec413c38..4be36a730a 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala @@ -18,6 +18,7 @@ package whisk.core.containerpool.logging import java.nio.file.{Path, Paths} +import java.time.Instant import akka.NotUsed import akka.actor.ActorSystem @@ -25,15 +26,15 @@ import akka.stream.alpakka.file.scaladsl.LogRotatorSink import akka.stream.{Graph, SinkShape, UniformFanOutShape} import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, Sink, Source} import akka.util.ByteString + import whisk.common.TransactionId import whisk.core.containerpool.Container import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation} import whisk.core.entity.size._ +import whisk.http.Messages + import spray.json._ import spray.json.DefaultJsonProtocol._ -import java.time.Instant - -import whisk.http.Messages import scala.concurrent.Future diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala index 153aa59c67..b4e3983af4 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationLogStore.scala @@ -19,18 +19,21 @@ package whisk.core.containerpool.logging import akka.NotUsed import akka.actor.ActorSystem +import akka.http.scaladsl.model.HttpRequest import akka.stream.ActorMaterializer import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Flow import akka.util.ByteString + import whisk.common.TransactionId import whisk.core.containerpool.Container import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation} -import spray.json._ import whisk.http.Messages import scala.concurrent.{ExecutionContext, Future} +import spray.json._ + /** * Represents a single log line as read from a docker log */ @@ -64,7 +67,8 @@ class DockerToActivationLogStore(system: ActorSystem) extends LogStore { override val containerParameters = Map("--log-driver" -> Set("json-file")) /* As logs are already part of the activation record, just return that bit of it */ - override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] = Future.successful(activation.logs) + override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] = + Future.successful(activation.logs) override def collectLogs(transid: TransactionId, user: Identity, diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala index 465fb2532f..ea1576cc12 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala @@ -18,10 +18,12 @@ package whisk.core.containerpool.logging import akka.actor.ActorSystem +import akka.http.scaladsl.model.HttpRequest + import whisk.core.entity.Identity import whisk.common.TransactionId import whisk.core.containerpool.Container -import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, WhiskActivation} +import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation} import scala.concurrent.Future @@ -47,7 +49,7 @@ class LogDriverLogStore(actorSystem: ActorSystem) extends LogStore { /** no logs exposed to API/CLI using only the LogDriverLogStore; use an extended version, * e.g. the SplunkLogStore to expose logs from some external source */ - def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] = + def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] = Future.successful(ActivationLogs(Vector("Logs are not available."))) } diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala index 335eed5d3e..28c5b9e93f 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogStore.scala @@ -18,6 +18,8 @@ package whisk.core.containerpool.logging import akka.actor.ActorSystem +import akka.http.scaladsl.model.HttpRequest + import whisk.common.TransactionId import whisk.core.containerpool.Container import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation} @@ -74,7 +76,7 @@ trait LogStore { * @param activation activation to fetch the logs for * @return the relevant logs */ - def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] + def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] } trait LogStoreProvider extends Spi { diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala index 596b776131..694fc9a3f1 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala @@ -36,18 +36,24 @@ import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source + import com.typesafe.sslconfig.akka.AkkaSSLConfig + import pureconfig._ + import scala.concurrent.Future import scala.concurrent.Promise import scala.util.Failure import scala.util.Success import scala.util.Try + import spray.json._ + import whisk.common.AkkaLogging import whisk.core.ConfigKeys import whisk.core.entity.ActivationLogs import whisk.core.entity.WhiskActivation +import whisk.core.entity.Identity case class SplunkLogStoreConfig(host: String, port: Int, @@ -92,7 +98,7 @@ class SplunkLogStore( Http().createClientHttpsContext(AkkaSSLConfig().mapSettings(s => s.withLoose(s.loose.withDisableSNI(true)))) else Http().defaultClientHttpsContext) - override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] = { + override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] = { //example curl request: // curl -u username:password -k https://splunkhost:port/services/search/jobs -d exec_mode=oneshot -d output_mode=json -d "search=search index=\"someindex\" | spath=activation_id | search activation_id=a930e5ae4ad4455c8f2505d665aad282 | table log_message" -d "earliest_time=2017-08-29T12:00:00" -d "latest_time=2017-10-29T12:00:00" diff --git a/core/controller/src/main/scala/whisk/core/controller/Activations.scala b/core/controller/src/main/scala/whisk/core/controller/Activations.scala index f7ea7b827f..6cc4ef73c4 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Activations.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Activations.scala @@ -26,6 +26,7 @@ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarsha import akka.http.scaladsl.model.StatusCodes.BadRequest import akka.http.scaladsl.server.Directives import akka.http.scaladsl.unmarshalling._ + import spray.json._ import spray.json.DefaultJsonProtocol.RootJsObjectFormat import whisk.common.TransactionId @@ -120,7 +121,7 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit resource.entity match { case Some(ActivationId(id)) => op match { - case READ => fetch(resource.namespace, id) + case READ => fetch(user, resource.namespace, id) case _ => reject // should not get here } case None => @@ -201,7 +202,8 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit * - 404 Not Found * - 500 Internal Server Error */ - private def fetch(namespace: EntityPath, activationId: ActivationId)(implicit transid: TransactionId) = { + private def fetch(user: Identity, namespace: EntityPath, activationId: ActivationId)( + implicit transid: TransactionId) = { val docid = DocId(WhiskEntity.qualifiedName(namespace, activationId)) pathEndOrSingleSlash { getEntity( @@ -211,7 +213,7 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit postProcess = Some((activation: WhiskActivation) => complete(activation.toExtendedJson))) } ~ (pathPrefix(resultPath) & pathEnd) { fetchResponse(docid) } ~ - (pathPrefix(logsPath) & pathEnd) { fetchLogs(docid) } + (pathPrefix(logsPath) & pathEnd) { fetchLogs(user, docid) } } /** @@ -238,11 +240,13 @@ trait WhiskActivationsApi extends Directives with AuthenticatedRouteProvider wit * - 404 Not Found * - 500 Internal Server Error */ - private def fetchLogs(docid: DocId)(implicit transid: TransactionId) = { - getEntityAndProject( - WhiskActivation, - activationStore, - docid, - (activation: WhiskActivation) => logStore.fetchLogs(activation).map(_.toJsonObject)) + private def fetchLogs(user: Identity, docid: DocId)(implicit transid: TransactionId) = { + extractRequest { request => + getEntityAndProject( + WhiskActivation, + activationStore, + docid, + (activation: WhiskActivation) => logStore.fetchLogs(user, activation, request).map(_.toJsonObject)) + } } } diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala index f25a49c06c..5c125795d9 100644 --- a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala @@ -26,30 +26,39 @@ import akka.http.scaladsl.model.HttpEntity import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.model.HttpMethods.POST +import akka.http.scaladsl.model.headers.RawHeader +import akka.http.scaladsl.model.MediaTypes import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.ActorMaterializer import akka.stream.StreamTcpException import akka.stream.scaladsl.Flow import akka.testkit.TestKit + import common.StreamLogging + import java.time.ZonedDateTime + +import pureconfig.error.ConfigReaderException + import org.junit.runner.RunWith import org.scalatest.Matchers import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.ScalaFutures import org.scalatest.junit.JUnitRunner -import scala.util.Failure -import whisk.core.entity.ActivationLogs import org.scalatest.FlatSpecLike -import pureconfig.error.ConfigReaderException + import scala.concurrent.Await import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.Success import scala.util.Try +import scala.util.Failure + import spray.json.JsNumber import spray.json.JsObject import spray.json._ + import whisk.core.entity.ActionLimits import whisk.core.entity.ActivationId import whisk.core.entity.ActivationResponse @@ -62,6 +71,9 @@ import whisk.core.entity.Subject import whisk.core.entity.TimeLimit import whisk.core.entity.WhiskActivation import whisk.core.entity.size._ +import whisk.core.entity.AuthKey +import whisk.core.entity.Identity +import whisk.core.entity.ActivationLogs @RunWith(classOf[JUnitRunner]) class SplunkLogStoreTests @@ -85,6 +97,12 @@ class SplunkLogStoreTests val startTime = "2007-12-03T10:15:30Z" val endTime = "2007-12-03T10:15:45Z" val endTimePlus5 = "2007-12-03T10:15:50Z" //queried end time range is endTime+5 + val user = Identity(Subject(), EntityName("testSpace"), AuthKey(), Set()) + val request = HttpRequest( + method = POST, + uri = "https://some.url", + headers = List(RawHeader("key", "value")), + entity = HttpEntity(MediaTypes.`application/json`, JsObject().compactPrint)) val activation = WhiskActivation( namespace = EntityPath("ns"), @@ -155,14 +173,14 @@ class SplunkLogStoreTests it should "find logs based on activation timestamps" in { //use the a flow that asserts the request structure and provides a response in the expected format val splunkStore = new SplunkLogStore(system, Some(testFlow), testConfig) - val result = Await.result(splunkStore.fetchLogs(activation), 1.second) + val result = Await.result(splunkStore.fetchLogs(user, activation, request), 1.second) result shouldBe ActivationLogs(Vector("some log message", "some other log message")) } it should "fail to connect to bogus host" in { //use the default http flow with the default bogus-host config val splunkStore = new SplunkLogStore(system, splunkConfig = testConfig) - val result = splunkStore.fetchLogs(activation) + val result = splunkStore.fetchLogs(user, activation, request) whenReady(result.failed, Timeout(1.second)) { ex => ex shouldBe an[StreamTcpException] } @@ -170,7 +188,7 @@ class SplunkLogStoreTests it should "display an error if API cannot be reached" in { //use a flow that generates a 500 response val splunkStore = new SplunkLogStore(system, Some(failFlow), testConfig) - val result = splunkStore.fetchLogs(activation) + val result = splunkStore.fetchLogs(user, activation, request) whenReady(result.failed, Timeout(1.second)) { ex => ex shouldBe an[RuntimeException] } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services