This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f1d1a1  Adding implementation of a Splunk-backed LogStore (#2957)
6f1d1a1 is described below

commit 6f1d1a18bd53c288a1bad9de740f664ea595b4e5
Author: tysonnorris <tysonnor...@gmail.com>
AuthorDate: Tue Feb 13 10:57:28 2018 -0800

    Adding implementation of a Splunk-backed LogStore (#2957)
    
    Co-authored-by: mcdan <m...@danmcweeney.com>
---
 .../src/main/scala/whisk/core/WhiskConfig.scala    |   3 +
 .../containerpool/logging/LogDriverLogStore.scala  |  56 +++++++
 .../containerpool/logging/SplunkLogStore.scala     | 160 +++++++++++++++++++
 .../logging/LogDriverLogStoreTests.scala           |  38 +++++
 .../logging/SplunkLogStoreTests.scala              | 177 +++++++++++++++++++++
 5 files changed, 434 insertions(+)

diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 5d3fada..c538d6e 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -258,4 +258,7 @@ object ConfigKeys {
 
   val transactions = "whisk.transactions"
   val stride = s"$transactions.stride"
+
+  val logStore = "whisk.logstore"
+  val splunk = s"$logStore.splunk"
 }
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
new file mode 100644
index 0000000..465fb25
--- /dev/null
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import akka.actor.ActorSystem
+import whisk.core.entity.Identity
+import whisk.common.TransactionId
+import whisk.core.containerpool.Container
+import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, 
WhiskActivation}
+
+import scala.concurrent.Future
+
+/**
+ * Docker log driver based LogStore impl. Uses docker log driver to emit 
container logs to an external store.
+ * Fetching logs from that external store is not provided in this trait. This 
SPI requires the
+ * ContainerArgs.extraArgs to be used to indicate where the logs are shipped.
+ * see 
https://docs.docker.com/config/containers/logging/configure/#configure-the-logging-driver-for-a-container
+ *
+ * Fetching logs here is a NOOP, but extended versions can customize fetching, 
e.g. from ELK or Splunk etc.
+ */
+class LogDriverLogStore(actorSystem: ActorSystem) extends LogStore {
+
+  /** Indicate --log-driver and --log-opt flags via 
ContainerArgsConfig.extraArgs */
+  override def containerParameters = Map()
+
+  def collectLogs(transid: TransactionId,
+                  user: Identity,
+                  activation: WhiskActivation,
+                  container: Container,
+                  action: ExecutableWhiskAction): Future[ActivationLogs] =
+    Future.successful(ActivationLogs()) //no logs collected when using docker 
log drivers (see DockerLogStore for json-file exception)
+
+  /** no logs exposed to API/CLI using only the LogDriverLogStore; use an 
extended version,
+   * e.g. the SplunkLogStore to expose logs from some external source */
+  def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] =
+    Future.successful(ActivationLogs(Vector("Logs are not available.")))
+}
+
+object LogDriverLogStoreProvider extends LogStoreProvider {
+  override def logStore(actorSystem: ActorSystem) = new 
LogDriverLogStore(actorSystem)
+}
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
new file mode 100644
index 0000000..596b776
--- /dev/null
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.client.RequestBuilding.Post
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.model.FormData
+import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.model.HttpResponse
+import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.model.Uri.Path
+import akka.http.scaladsl.model.headers.Authorization
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import akka.stream.ActorMaterializer
+import akka.stream.OverflowStrategy
+import akka.stream.QueueOfferResult
+import akka.stream.scaladsl.Flow
+import akka.stream.scaladsl.Keep
+import akka.stream.scaladsl.Sink
+import akka.stream.scaladsl.Source
+import com.typesafe.sslconfig.akka.AkkaSSLConfig
+import pureconfig._
+import scala.concurrent.Future
+import scala.concurrent.Promise
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
+import spray.json._
+import whisk.common.AkkaLogging
+import whisk.core.ConfigKeys
+import whisk.core.entity.ActivationLogs
+import whisk.core.entity.WhiskActivation
+
+case class SplunkLogStoreConfig(host: String,
+                                port: Int,
+                                username: String,
+                                password: String,
+                                index: String,
+                                logMessageField: String,
+                                activationIdField: String,
+                                disableSNI: Boolean)
+case class SplunkResponse(results: Vector[JsObject])
+object SplunkResponseJsonProtocol extends DefaultJsonProtocol {
+  implicit val orderFormat = jsonFormat1(SplunkResponse)
+}
+
+/**
+ * A Splunk based impl of LogDriverLogStore. Logs are routed to splunk via 
docker log driver, and retrieved via Splunk REST API
+ *
+ * @param actorSystem
+ * @param httpFlow Optional Flow to use for HttpRequest handling (to enable 
stream based tests)
+ */
+class SplunkLogStore(
+  actorSystem: ActorSystem,
+  httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), 
(Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
+  splunkConfig: SplunkLogStoreConfig = 
loadConfigOrThrow[SplunkLogStoreConfig](ConfigKeys.splunk))
+    extends LogDriverLogStore(actorSystem) {
+  implicit val as = actorSystem
+  implicit val ec = as.dispatcher
+  implicit val materializer = ActorMaterializer()
+  private val logging = new AkkaLogging(actorSystem.log)
+
+  private val splunkApi = Path / "services" / "search" / "jobs" //see 
http://docs.splunk.com/Documentation/Splunk/6.6.3/RESTREF/RESTsearch#search.2Fjobs
+
+  import SplunkResponseJsonProtocol._
+
+  val maxPendingRequests = 500
+
+  val defaultHttpFlow = 
Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](
+    host = splunkConfig.host,
+    port = splunkConfig.port,
+    connectionContext =
+      if (splunkConfig.disableSNI)
+        Http().createClientHttpsContext(AkkaSSLConfig().mapSettings(s => 
s.withLoose(s.loose.withDisableSNI(true))))
+      else Http().defaultClientHttpsContext)
+
+  override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] 
= {
+
+    //example curl request:
+    //    curl -u  username:password -k 
https://splunkhost:port/services/search/jobs -d exec_mode=oneshot -d 
output_mode=json -d "search=search index=\"someindex\" | spath=activation_id | 
search activation_id=a930e5ae4ad4455c8f2505d665aad282 |  table log_message" -d 
"earliest_time=2017-08-29T12:00:00" -d "latest_time=2017-10-29T12:00:00"
+    //example response:
+    //    
{"preview":false,"init_offset":0,"messages":[],"fields":[{"name":"log_message"}],"results":[{"log_message":"some
 log message"}], "highlighted":{}}
+    //note: splunk returns results in reverse-chronological order, therefore 
we include "| reverse" to cause results to arrive in chronological order
+    val search =
+      s"""search index="${splunkConfig.index}"| spath 
${splunkConfig.activationIdField}| search 
${splunkConfig.activationIdField}=${activation.activationId.toString}| table 
${splunkConfig.logMessageField}| reverse"""
+
+    val entity = FormData(
+      Map(
+        "exec_mode" -> "oneshot",
+        "search" -> search,
+        "output_mode" -> "json",
+        "earliest_time" -> activation.start.toString, //assume that activation 
start/end are UTC zone, and splunk events are the same
+        "latest_time" -> activation.end
+          .plusSeconds(5) //add 5s to avoid a timerange of 0 on short-lived 
activations
+          .toString)).toEntity
+
+    logging.debug(this, "sending request")
+    queueRequest(
+      Post(Uri(path = splunkApi))
+        .withEntity(entity)
+        
.withHeaders(List(Authorization(BasicHttpCredentials(splunkConfig.username, 
splunkConfig.password)))))
+      .flatMap(response => {
+        logging.debug(this, s"splunk API response ${response}")
+        Unmarshal(response.entity)
+          .to[SplunkResponse]
+          .map(r => {
+            ActivationLogs(
+              r.results
+                .map(_.fields(splunkConfig.logMessageField).convertTo[String]))
+          })
+      })
+  }
+
+  //based on 
http://doc.akka.io/docs/akka-http/10.0.6/scala/http/client-side/host-level.html
+  val queue =
+    Source
+      .queue[(HttpRequest, Promise[HttpResponse])](maxPendingRequests, 
OverflowStrategy.dropNew)
+      .via(httpFlow.getOrElse(defaultHttpFlow))
+      .toMat(Sink.foreach({
+        case ((Success(resp), p)) => p.success(resp)
+        case ((Failure(e), p))    => p.failure(e)
+      }))(Keep.left)
+      .run()
+
+  def queueRequest(request: HttpRequest): Future[HttpResponse] = {
+    val responsePromise = Promise[HttpResponse]()
+    queue.offer(request -> responsePromise).flatMap {
+      case QueueOfferResult.Enqueued => responsePromise.future
+      case QueueOfferResult.Dropped =>
+        Future.failed(new RuntimeException("Splunk API Client Queue 
overflowed. Try again later."))
+      case QueueOfferResult.Failure(ex) => Future.failed(ex)
+      case QueueOfferResult.QueueClosed =>
+        Future.failed(
+          new RuntimeException(
+            "Splunk API Client Queue was closed (pool shut down) while running 
the request. Try again later."))
+    }
+  }
+}
+
+object SplunkLogStoreProvider extends LogStoreProvider {
+  override def logStore(actorSystem: ActorSystem) = new 
SplunkLogStore(actorSystem)
+}
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/logging/LogDriverLogStoreTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/logging/LogDriverLogStoreTests.scala
new file mode 100644
index 0000000..b87e423
--- /dev/null
+++ 
b/tests/src/test/scala/whisk/core/containerpool/logging/LogDriverLogStoreTests.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import akka.actor.ActorSystem
+import akka.testkit.TestKit
+import org.scalatest.FlatSpecLike
+import org.scalatest.Matchers
+import whisk.core.containerpool.ContainerArgsConfig
+
+class LogDriverLogStoreTests extends TestKit(ActorSystem("LogDriverLogStore")) 
with FlatSpecLike with Matchers {
+
+  val testConfig = ContainerArgsConfig(
+    network = "network",
+    extraArgs =
+      Map("log-driver" -> Set("fluentd"), "log-opt" -> 
Set("fluentd-address=localhost:24225", "tag=OW_CONTAINER")))
+  behavior of "LogDriver LogStore"
+
+  it should "set the container parameters from the config" in {
+    val logDriverLogStore = new LogDriverLogStore(system)
+    logDriverLogStore.containerParameters shouldBe Map()
+  }
+}
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
new file mode 100644
index 0000000..08faa47
--- /dev/null
+++ 
b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.http.javadsl.model.headers.Authorization
+import akka.http.scaladsl.model.ContentTypes
+import akka.http.scaladsl.model.FormData
+import akka.http.scaladsl.model.HttpEntity
+import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.model.HttpResponse
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import akka.stream.ActorMaterializer
+import akka.stream.StreamTcpException
+import akka.stream.scaladsl.Flow
+import akka.testkit.TestKit
+import common.StreamLogging
+import java.time.ZonedDateTime
+import org.scalatest.Matchers
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.concurrent.ScalaFutures
+import scala.util.Failure
+import whisk.core.entity.ActivationLogs
+import org.scalatest.FlatSpecLike
+import pureconfig.error.ConfigReaderException
+import scala.concurrent.Await
+import scala.concurrent.Promise
+import scala.concurrent.duration._
+import scala.util.Success
+import scala.util.Try
+import spray.json.JsNumber
+import spray.json.JsObject
+import spray.json._
+import whisk.core.entity.ActionLimits
+import whisk.core.entity.ActivationId
+import whisk.core.entity.ActivationResponse
+import whisk.core.entity.EntityName
+import whisk.core.entity.EntityPath
+import whisk.core.entity.LogLimit
+import whisk.core.entity.MemoryLimit
+import whisk.core.entity.Parameters
+import whisk.core.entity.Subject
+import whisk.core.entity.TimeLimit
+import whisk.core.entity.WhiskActivation
+import whisk.core.entity.size._
+
+class SplunkLogStoreTests
+    extends TestKit(ActorSystem("SplunkLogStore"))
+    with FlatSpecLike
+    with Matchers
+    with ScalaFutures
+    with StreamLogging {
+  val testConfig = SplunkLogStoreConfig(
+    "splunk-host",
+    8080,
+    "splunk-user",
+    "splunk-pass",
+    "splunk-index",
+    "log_message",
+    "activation_id",
+    false)
+
+  behavior of "Splunk LogStore"
+
+  val startTime = "2007-12-03T10:15:30Z"
+  val endTime = "2007-12-03T10:15:45Z"
+  val endTimePlus5 = "2007-12-03T10:15:50Z" //queried end time range is 
endTime+5
+
+  val activation = WhiskActivation(
+    namespace = EntityPath("ns"),
+    name = EntityName("a"),
+    Subject(),
+    activationId = ActivationId(),
+    start = ZonedDateTime.parse(startTime).toInstant,
+    end = ZonedDateTime.parse(endTime).toInstant,
+    response = ActivationResponse.success(Some(JsObject("res" -> 
JsNumber(1)))),
+    annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), 
MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
+    duration = Some(123))
+
+  implicit val ec = system.dispatcher
+  implicit val materializer = ActorMaterializer()
+
+  val testFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], 
Promise[HttpResponse]), NotUsed] =
+    Flow[(HttpRequest, Promise[HttpResponse])]
+      .mapAsyncUnordered(1) {
+        case (request, userContext) =>
+          //we use cachedHostConnectionPoolHttps so won't get the host+port 
with the request
+          Unmarshal(request.entity)
+            .to[FormData]
+            .map { form =>
+              val earliestTime = form.fields.get("earliest_time")
+              val latestTime = form.fields.get("latest_time")
+              val outputMode = form.fields.get("output_mode")
+              val search = form.fields.get("search")
+              val execMode = form.fields.get("exec_mode")
+
+              request.uri.path.toString() shouldBe "/services/search/jobs"
+              request.headers shouldBe 
List(Authorization.basic(testConfig.username, testConfig.password))
+              earliestTime shouldBe Some(startTime)
+              latestTime shouldBe Some(endTimePlus5)
+              outputMode shouldBe Some("json")
+              execMode shouldBe Some("oneshot")
+              search shouldBe Some(
+                s"""search index="${testConfig.index}"| spath 
${testConfig.activationIdField}| search 
${testConfig.activationIdField}=${activation.activationId.toString}| table 
${testConfig.logMessageField}| reverse""")
+
+              (
+                Success(
+                  HttpResponse(
+                    StatusCodes.OK,
+                    entity = HttpEntity(
+                      ContentTypes.`application/json`,
+                      
"""{"preview":false,"init_offset":0,"messages":[],"fields":[{"name":"log_message"}],"results":[{"log_message":"some
 log message"},{"log_message":"some other log message"}], 
"highlighted":{}}"""))),
+                userContext)
+            }
+            .recover {
+              case e =>
+                println("failed")
+                (Failure(e), userContext)
+            }
+      }
+  val failFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], 
Promise[HttpResponse]), NotUsed] =
+    Flow[(HttpRequest, Promise[HttpResponse])]
+      .map {
+        case (request, userContext) =>
+          (Success(HttpResponse(StatusCodes.InternalServerError)), userContext)
+
+      }
+
+  it should "fail when loading out of box configs (because 
whisk.logstore.splunk doesn't exist)" in {
+    assertThrows[ConfigReaderException[_]] {
+      val splunkStore = new SplunkLogStore(system)
+    }
+
+  }
+  it should "find logs based on activation timestamps" in {
+    //use the a flow that asserts the request structure and provides a 
response in the expected format
+    val splunkStore = new SplunkLogStore(system, Some(testFlow), testConfig)
+    val result = Await.result(splunkStore.fetchLogs(activation), 1.second)
+    result shouldBe ActivationLogs(Vector("some log message", "some other log 
message"))
+  }
+
+  it should "fail to connect to bogus host" in {
+    //use the default http flow with the default bogus-host config
+    val splunkStore = new SplunkLogStore(system, splunkConfig = testConfig)
+    val result = splunkStore.fetchLogs(activation)
+    whenReady(result.failed, Timeout(1.second)) { ex =>
+      ex shouldBe an[StreamTcpException]
+    }
+  }
+  it should "display an error if API cannot be reached" in {
+    //use a flow that generates a 500 response
+    val splunkStore = new SplunkLogStore(system, Some(failFlow), testConfig)
+    val result = splunkStore.fetchLogs(activation)
+    whenReady(result.failed, Timeout(1.second)) { ex =>
+      ex shouldBe an[RuntimeException]
+    }
+
+  }
+
+}

-- 
To stop receiving notification emails like this one, please contact
markusthoem...@apache.org.

Reply via email to