This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 5667fb0  User Events support in OpenWhisk Standalone mode (#4656)
5667fb0 is described below

commit 5667fb0d5abafc242d9b96b313464b2b700525f2
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Thu Oct 3 22:43:52 2019 +0530

    User Events support in OpenWhisk Standalone mode (#4656)
    
    * Enable support to launch user-event and prometheus
    
    * Enable grafana support
    
    * Remove unused command
    
    * Update readme
    
    * Update log message
    
    * Include controller url in list of launched services
    
    * Reuse existing prometheus config
---
 core/standalone/README.md                          |  28 +++-
 core/standalone/build.gradle                       |  22 +++
 core/standalone/src/main/resources/standalone.conf |   7 +
 .../openwhisk/standalone/StandaloneOpenWhisk.scala |  50 +++++--
 .../org/apache/openwhisk/standalone/Unzip.scala    |  47 +++++++
 .../openwhisk/standalone/UserEventLauncher.scala   | 149 +++++++++++++++++++++
 6 files changed, 294 insertions(+), 9 deletions(-)

diff --git a/core/standalone/README.md b/core/standalone/README.md
index 2d84671..1fa0af7 100644
--- a/core/standalone/README.md
+++ b/core/standalone/README.md
@@ -96,6 +96,8 @@ $ java -jar openwhisk-standalone.jar -h
       --kafka-ui                   Enable Kafka UI
   -m, --manifest  <arg>            Manifest json defining the supported 
runtimes
   -p, --port  <arg>                Server port
+      --user-events                Enable User Events along with Prometheus and
+                                   Grafana
   -v, --verbose
       --zk-port  <arg>             Zookeeper port. If not specified then 2181 
or
                                    some random free port (if 2181 is busy) 
would
@@ -236,7 +238,7 @@ java -jar openwhisk-standalone.jar --kafka --kafka-ui
 ```
 
 By default the ui server would be accessible at http://localhost:9000. In case 
9000 port is busy then a random port would
-be selected. TO find out the port look for message in log like below (or grep 
for `whisk-kafka-drop-ui`)
+be selected. To find out the port look for message in log like below (or grep 
for `whisk-kafka-drop-ui`)
 
 ```
 [ 9092  ] localhost:9092 (kafka)
@@ -245,9 +247,33 @@ be selected. TO find out the port look for message in log 
like below (or grep fo
 [ 9000  ] http://localhost:9000 (whisk-kafka-drop-ui)
 ```
 
+#### User Events
+
+Standalone OpenWhisk supports emitting [user events][7] and displaying them 
via Grafana Dashboard. The metrics are
+consumed by [User Event Service][8] which converts them into metrics consumed 
via Prometheus.
+
+```
+java -jar openwhisk-standalone.jar --user-events
+```
+
+This mode would launch an embedded Kafka, User Event service, Prometheus and 
Grafana with preconfigured dashboards.
+
+```
+Launched service details
+
+[ 9092  ] localhost:9092 (kafka)
+[ 9091  ] 192.168.65.2:9091 (kafka-docker)
+[ 2181  ] Zookeeper (zookeeper)
+[ 3235  ] http://localhost:3235 (whisk-user-events)
+[ 9090  ] http://localhost:9090 (whisk-prometheus)
+[ 3000  ] http://localhost:3000 (whisk-grafana)
+```
+
 [1]: https://github.com/apache/incubator-openwhisk/blob/master/docs/cli.md
 [2]: https://github.com/apache/incubator-openwhisk/blob/master/docs/samples.md
 [3]: https://github.com/apache/incubator-openwhisk-apigateway
 [4]: 
https://github.com/apache/incubator-openwhisk/blob/master/docs/apigateway.md
 [5]: https://github.com/embeddedkafka/embedded-kafka
 [6]: https://github.com/obsidiandynamics/kafdrop
+[7]: 
https://github.com/apache/openwhisk/blob/master/docs/metrics.md#user-specific-metrics
+[8]: 
https://github.com/apache/openwhisk/blob/master/core/monitoring/user-events/README.md
diff --git a/core/standalone/build.gradle b/core/standalone/build.gradle
index 1879c56..3c57c7a 100644
--- a/core/standalone/build.gradle
+++ b/core/standalone/build.gradle
@@ -80,8 +80,24 @@ task copyGWActions() {
     }
 }
 
+task copyGrafanaConfig() {
+    doLast {
+        def grafanaDir = new File(project.rootProject.getProjectDir(), 
"core/monitoring/user-events/compose/grafana")
+        def grafanaBuildDir = mkdir("$buildDir/tmp/grafana")
+        def zipFileName = "grafana-config.zip"
+        def zipFile = new File(grafanaBuildDir, zipFileName)
+        if (!zipFile.exists()) {
+            ant.zip(destfile:zipFile){
+                fileset(dir:grafanaDir)
+            }
+            logger.info("Created grafana config zip $zipFileName")
+        }
+    }
+}
+
 processResources.dependsOn copySwagger
 processResources.dependsOn copyGWActions
+processResources.dependsOn copyGrafanaConfig
 
 processResources {
     from(new File(project.rootProject.projectDir, 
"ansible/files/runtimes.json")) {
@@ -91,6 +107,12 @@ processResources {
         include "*.json"
         into("couch")
     }
+    from(file("$buildDir/tmp/grafana/grafana-config.zip")){
+        into(".")
+    }
+    from(new File(project.rootProject.getProjectDir(), 
"core/monitoring/user-events/compose/prometheus/prometheus.yml")){
+        into(".")
+    }
     //Implement the logic present in controller Docker file
     from(project.swaggerUiDir) {
         include "index.html"
diff --git a/core/standalone/src/main/resources/standalone.conf 
b/core/standalone/src/main/resources/standalone.conf
index 4d3463b..ffd7316 100644
--- a/core/standalone/src/main/resources/standalone.conf
+++ b/core/standalone/src/main/resources/standalone.conf
@@ -114,8 +114,15 @@ whisk {
         "logCleanup_design_document_for_activations_db.json"
       ]
     }
+
+    user-events {
+      image = "openwhisk/user-events:nightly"
+      prometheus-image = "prom/prometheus:v2.5.0"
+      grafana-image = "grafana/grafana:6.1.6"
+    }
   }
   apache-client {
     retry-no-http-response-exception = true
   }
+
 }
diff --git 
a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/StandaloneOpenWhisk.scala
 
b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/StandaloneOpenWhisk.scala
index 093c37e..854f8c6 100644
--- 
a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/StandaloneOpenWhisk.scala
+++ 
b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/StandaloneOpenWhisk.scala
@@ -94,6 +94,8 @@ class Conf(arguments: Seq[String]) extends 
ScallopConf(arguments) {
     noshort = true,
     required = false)
 
+  val userEvents = opt[Boolean](descr = "Enable User Events along with 
Prometheus and Grafana", noshort = true)
+
   verify()
 
   val colorEnabled = !disableColorLogging()
@@ -108,6 +110,7 @@ object StandaloneConfigKeys {
   val redisConfigKey = "whisk.standalone.redis"
   val apiGwConfigKey = "whisk.standalone.api-gateway"
   val couchDBConfigKey = "whisk.standalone.couchdb"
+  val userEventConfigKey = "whisk.standalone.user-events"
 }
 
 object StandaloneOpenWhisk extends SLF4JLogging {
@@ -175,19 +178,27 @@ object StandaloneOpenWhisk extends SLF4JLogging {
     val (dataDir, workDir) = initializeDirs(conf)
     val (dockerClient, dockerSupport) = prepareDocker(conf)
 
+    val defaultSvcs = Seq(
+      ServiceContainer(
+        conf.port(),
+        s"http://${StandaloneDockerSupport.getLocalHostName()}:${conf.port()}",
+        "Controller"))
+
     val (apiGwApiPort, apiGwSvcs) = if (conf.apiGw()) {
       startApiGateway(conf, dockerClient, dockerSupport)
     } else (-1, Seq.empty)
 
-    val (kafkaPort, kafkaSvcs) = if (conf.kafka()) {
+    val (kafkaDockerPort, kafkaSvcs) = if (conf.kafka() || conf.userEvents()) {
       startKafka(workDir, dockerClient, conf, conf.kafkaUi())
     } else (-1, Seq.empty)
 
     val couchSvcs = if (conf.couchdb()) Some(startCouchDb(dataDir, 
dockerClient)) else None
-    val svcs = Seq(apiGwSvcs, couchSvcs.toList, kafkaSvcs).flatten
-    if (svcs.nonEmpty) {
-      new ServiceInfoLogger(conf, svcs, dataDir).run()
-    }
+    val userEventSvcs =
+      if (conf.userEvents()) startUserEvents(conf.port(), kafkaDockerPort, 
workDir, dataDir, dockerClient)
+      else Seq.empty
+
+    val svcs = Seq(defaultSvcs, apiGwSvcs, couchSvcs.toList, kafkaSvcs, 
userEventSvcs).flatten
+    new ServiceInfoLogger(conf, svcs, dataDir).run()
 
     startServer(conf)
     new ServerStartupCheck(conf.serverUrl, "OpenWhisk").waitForServerToStart()
@@ -433,12 +444,13 @@ object StandaloneOpenWhisk extends SLF4JLogging {
     as: ActorSystem,
     ec: ExecutionContext,
     materializer: ActorMaterializer): (Int, Seq[ServiceContainer]) = {
-    val kafkaPort = getPort(conf.kafkaPort.toOption, preferredKafkaPort)
     implicit val tid: TransactionId = TransactionId(systemPrefix + "kafka")
+    val kafkaPort = getPort(conf.kafkaPort.toOption, preferredKafkaPort)
+    val kafkaDockerPort = getPort(conf.kafkaDockerPort.toOption, 
preferredKafkaDockerPort)
     val k = new KafkaLauncher(
       dockerClient,
       kafkaPort,
-      getPort(conf.kafkaDockerPort.toOption, preferredKafkaDockerPort),
+      kafkaDockerPort,
       getPort(conf.zkPort.toOption, preferredZkPort),
       workDir,
       kafkaUi)
@@ -457,7 +469,29 @@ object StandaloneOpenWhisk extends SLF4JLogging {
     setConfigProp(WhiskConfig.kafkaHostList, s"localhost:$kafkaPort")
     setSysProp("whisk.spi.MessagingProvider", 
"org.apache.openwhisk.connector.kafka.KafkaMessagingProvider")
     setSysProp("whisk.spi.LoadBalancerProvider", 
"org.apache.openwhisk.standalone.KafkaAwareLeanBalancer")
-    (kafkaPort, services)
+    (kafkaDockerPort, services)
+  }
+
+  private def startUserEvents(owPort: Int,
+                              kafkaDockerPort: Int,
+                              workDir: File,
+                              dataDir: File,
+                              dockerClient: StandaloneDockerClient)(
+    implicit logging: Logging,
+    as: ActorSystem,
+    ec: ExecutionContext,
+    materializer: ActorMaterializer): Seq[ServiceContainer] = {
+    implicit val tid: TransactionId = TransactionId(systemPrefix + 
"userevents")
+    val k = new UserEventLauncher(dockerClient, owPort, kafkaDockerPort, 
workDir, dataDir)
+
+    val f = k.run()
+    val g = f.andThen {
+      case Success(_) =>
+        logging.info(this, s"User events started successfully")
+      case Failure(t) =>
+        logging.error(this, "Error starting Kafka" + t)
+    }
+    Await.result(g, 5.minutes)
   }
 
   private def getPort(configured: Option[Int], preferred: Int): Int = {
diff --git 
a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/Unzip.scala 
b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/Unzip.scala
new file mode 100644
index 0000000..ea5136d
--- /dev/null
+++ b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/Unzip.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.standalone
+
+import java.io.{File, FileOutputStream, InputStream}
+import java.util.zip.ZipInputStream
+
+object Unzip {
+
+  def apply(is: InputStream, dir: File): Unit = {
+    //Based on https://stackoverflow.com/a/40547896/1035417
+    val zis = new ZipInputStream((is))
+    val dest = dir.toPath
+    Stream.continually(zis.getNextEntry).takeWhile(_ != null).foreach { 
zipEntry =>
+      if (!zipEntry.isDirectory) {
+        val outPath = dest.resolve(zipEntry.getName)
+        val outPathParent = outPath.getParent
+        if (!outPathParent.toFile.exists()) {
+          outPathParent.toFile.mkdirs()
+        }
+
+        val outFile = outPath.toFile
+        val out = new FileOutputStream(outFile)
+        val buffer = new Array[Byte](4096)
+        Stream.continually(zis.read(buffer)).takeWhile(_ != 
-1).foreach(out.write(buffer, 0, _))
+        out.close()
+      }
+    }
+    zis.close()
+  }
+
+}
diff --git 
a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/UserEventLauncher.scala
 
b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/UserEventLauncher.scala
new file mode 100644
index 0000000..cbfc04b
--- /dev/null
+++ 
b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/UserEventLauncher.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.standalone
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import 
org.apache.openwhisk.standalone.StandaloneDockerSupport.{checkOrAllocatePort, 
containerName, createRunCmd}
+import pureconfig.loadConfigOrThrow
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class UserEventLauncher(docker: StandaloneDockerClient, owPort: Int, 
kafkaDockerPort: Int, workDir: File, dataDir: File)(
+  implicit logging: Logging,
+  ec: ExecutionContext,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  tid: TransactionId) {
+
+  //owPort+1 is used by Api Gateway
+  private val userEventPort = checkOrAllocatePort(owPort + 2)
+  private val prometheusPort = checkOrAllocatePort(9090)
+  private val grafanaPort = checkOrAllocatePort(3000)
+
+  case class UserEventConfig(image: String, prometheusImage: String, 
grafanaImage: String)
+
+  private val userEventConfig = 
loadConfigOrThrow[UserEventConfig](StandaloneConfigKeys.userEventConfigKey)
+
+  private val hostIp = StandaloneDockerSupport.getLocalHostIp()
+
+  def run(): Future[Seq[ServiceContainer]] = {
+    for {
+      userEvent <- runUserEvents()
+      (promContainer, promSvc) <- runPrometheus()
+      grafanaSvc <- runGrafana(promContainer)
+    } yield {
+      logging.info(this, "Enabled the user-event config")
+      System.setProperty("whisk.user-events.enabled", "true")
+      Seq(userEvent, promSvc, grafanaSvc)
+    }
+  }
+
+  def runUserEvents(): Future[ServiceContainer] = {
+    val env = Map("KAFKA_HOSTS" -> s"$hostIp:$kafkaDockerPort")
+
+    logging.info(this, s"Starting User Events: $userEventPort")
+    val name = containerName("user-events")
+    val params = Map("-p" -> Set(s"$userEventPort:9095"))
+    val args = createRunCmd(name, env, params)
+
+    val f = docker.runDetached(userEventConfig.image, args, true)
+    f.map(_ => ServiceContainer(userEventPort, 
s"http://localhost:$userEventPort";, name))
+  }
+
+  def runPrometheus(): Future[(StandaloneDockerContainer, ServiceContainer)] = 
{
+    logging.info(this, s"Starting Prometheus at $prometheusPort")
+    val baseParams = Map("-p" -> Set(s"$prometheusPort:9090"))
+    val promConfigDir = newDir(workDir, "prometheus")
+    val promDataDir = newDir(dataDir, "prometheus")
+
+    val configFile = new File(promConfigDir, "prometheus.yml")
+    FileUtils.write(configFile, prometheusConfig, UTF_8)
+
+    val volParams = Map(
+      "-v" -> Set(s"${promDataDir.getAbsolutePath}:/prometheus", 
s"${promConfigDir.getAbsolutePath}:/etc/prometheus/"))
+    val name = containerName("prometheus")
+    val args = createRunCmd(name, Map.empty, baseParams ++ volParams)
+    val f = docker.runDetached(userEventConfig.prometheusImage, args, 
shouldPull = true)
+    val sc = ServiceContainer(prometheusPort, 
s"http://localhost:$prometheusPort";, name)
+    f.map(c => (c, sc))
+  }
+
+  def runGrafana(promContainer: StandaloneDockerContainer): 
Future[ServiceContainer] = {
+    logging.info(this, s"Starting Grafana at $grafanaPort")
+    val baseParams = Map("-p" -> Set(s"$grafanaPort:3000"))
+    val grafanaConfigDir = newDir(workDir, "grafana")
+    val grafanaDataDir = newDir(dataDir, "grafana")
+
+    val promUrl = s"http://$hostIp:$prometheusPort";
+    unzipGrafanaConfig(grafanaConfigDir, promUrl)
+
+    val env = Map(
+      "GF_PATHS_PROVISIONING" -> "/etc/grafana/provisioning",
+      "GF_USERS_ALLOW_SIGN_UP" -> "false",
+      "GF_AUTH_ANONYMOUS_ENABLED" -> "true",
+      "GF_AUTH_ANONYMOUS_ORG_NAME" -> "Main Org.",
+      "GF_AUTH_ANONYMOUS_ORG_ROLE" -> "Admin")
+
+    val volParams = Map(
+      "-v" -> Set(
+        s"${grafanaDataDir.getAbsolutePath}:/var/lib/grafanas",
+        
s"${grafanaConfigDir.getAbsolutePath}/provisioning/:/etc/grafana/provisioning/",
+        
s"${grafanaConfigDir.getAbsolutePath}/dashboards/:/var/lib/grafana/dashboards/"))
+    val name = containerName("grafana")
+    val args = createRunCmd(name, env, baseParams ++ volParams)
+    val f = docker.runDetached(userEventConfig.grafanaImage, args, shouldPull 
= true)
+    val sc = ServiceContainer(grafanaPort, s"http://localhost:$grafanaPort";, 
name)
+    f.map(_ => sc)
+  }
+
+  private def prometheusConfig = {
+    val config = IOUtils.resourceToString("/prometheus.yml", UTF_8)
+    val pattern = "'user-events:9095'"
+    require(config.contains(pattern), s"Did not found expected pattern 
$pattern in prometheus config $config")
+
+    val targets = s"'$hostIp:$userEventPort', '$hostIp:$owPort'"
+    config.replace(pattern, targets)
+  }
+
+  private def unzipGrafanaConfig(configDir: File, promUrl: String): Unit = {
+    val is = getClass.getResourceAsStream("/grafana-config.zip")
+    if (is != null) {
+      Unzip(is, configDir)
+      val configFile = new File(configDir, 
"provisioning/datasources/datasource.yml")
+      val config = FileUtils.readFileToString(configFile, UTF_8)
+      val updatedConfig = config.replace("http://prometheus:9090";, promUrl)
+      FileUtils.write(configFile, updatedConfig, UTF_8)
+    } else {
+      logging.warn(
+        this,
+        "Did not found the grafana-config.zip in classpath. Make sure its 
packaged and present in classpath")
+    }
+  }
+
+  private def newDir(baseDir: File, name: String) = {
+    val dir = new File(baseDir, name)
+    FileUtils.forceMkdir(dir)
+    dir
+  }
+}

Reply via email to