This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 5667fb0 User Events support in OpenWhisk Standalone mode (#4656)
5667fb0 is described below
commit 5667fb0d5abafc242d9b96b313464b2b700525f2
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Thu Oct 3 22:43:52 2019 +0530
User Events support in OpenWhisk Standalone mode (#4656)
* Enable support to launch user-event and prometheus
* Enable grafana support
* Remove unused command
* Update readme
* Update log message
* Include controller url in list of launched services
* Reuse existing prometheus config
---
core/standalone/README.md | 28 +++-
core/standalone/build.gradle | 22 +++
core/standalone/src/main/resources/standalone.conf | 7 +
.../openwhisk/standalone/StandaloneOpenWhisk.scala | 50 +++++--
.../org/apache/openwhisk/standalone/Unzip.scala | 47 +++++++
.../openwhisk/standalone/UserEventLauncher.scala | 149 +++++++++++++++++++++
6 files changed, 294 insertions(+), 9 deletions(-)
diff --git a/core/standalone/README.md b/core/standalone/README.md
index 2d84671..1fa0af7 100644
--- a/core/standalone/README.md
+++ b/core/standalone/README.md
@@ -96,6 +96,8 @@ $ java -jar openwhisk-standalone.jar -h
--kafka-ui Enable Kafka UI
-m, --manifest <arg> Manifest json defining the supported
runtimes
-p, --port <arg> Server port
+ --user-events Enable User Events along with Prometheus and
+ Grafana
-v, --verbose
--zk-port <arg> Zookeeper port. If not specified then 2181
or
some random free port (if 2181 is busy)
would
@@ -236,7 +238,7 @@ java -jar openwhisk-standalone.jar --kafka --kafka-ui
```
By default the ui server would be accessible at http://localhost:9000. In case
9000 port is busy then a random port would
-be selected. TO find out the port look for message in log like below (or grep
for `whisk-kafka-drop-ui`)
+be selected. To find out the port look for message in log like below (or grep
for `whisk-kafka-drop-ui`)
```
[ 9092 ] localhost:9092 (kafka)
@@ -245,9 +247,33 @@ be selected. TO find out the port look for message in log
like below (or grep fo
[ 9000 ] http://localhost:9000 (whisk-kafka-drop-ui)
```
+#### User Events
+
+Standalone OpenWhisk supports emitting [user events][7] and displaying them
via Grafana Dashboard. The metrics are
+consumed by [User Event Service][8] which converts them into metrics consumed
via Prometheus.
+
+```
+java -jar openwhisk-standalone.jar --user-events
+```
+
+This mode would launch an embedded Kafka, User Event service, Prometheus and
Grafana with preconfigured dashboards.
+
+```
+Launched service details
+
+[ 9092 ] localhost:9092 (kafka)
+[ 9091 ] 192.168.65.2:9091 (kafka-docker)
+[ 2181 ] Zookeeper (zookeeper)
+[ 3235 ] http://localhost:3235 (whisk-user-events)
+[ 9090 ] http://localhost:9090 (whisk-prometheus)
+[ 3000 ] http://localhost:3000 (whisk-grafana)
+```
+
[1]: https://github.com/apache/incubator-openwhisk/blob/master/docs/cli.md
[2]: https://github.com/apache/incubator-openwhisk/blob/master/docs/samples.md
[3]: https://github.com/apache/incubator-openwhisk-apigateway
[4]:
https://github.com/apache/incubator-openwhisk/blob/master/docs/apigateway.md
[5]: https://github.com/embeddedkafka/embedded-kafka
[6]: https://github.com/obsidiandynamics/kafdrop
+[7]:
https://github.com/apache/openwhisk/blob/master/docs/metrics.md#user-specific-metrics
+[8]:
https://github.com/apache/openwhisk/blob/master/core/monitoring/user-events/README.md
diff --git a/core/standalone/build.gradle b/core/standalone/build.gradle
index 1879c56..3c57c7a 100644
--- a/core/standalone/build.gradle
+++ b/core/standalone/build.gradle
@@ -80,8 +80,24 @@ task copyGWActions() {
}
}
+task copyGrafanaConfig() {
+ doLast {
+ def grafanaDir = new File(project.rootProject.getProjectDir(),
"core/monitoring/user-events/compose/grafana")
+ def grafanaBuildDir = mkdir("$buildDir/tmp/grafana")
+ def zipFileName = "grafana-config.zip"
+ def zipFile = new File(grafanaBuildDir, zipFileName)
+ if (!zipFile.exists()) {
+ ant.zip(destfile:zipFile){
+ fileset(dir:grafanaDir)
+ }
+ logger.info("Created grafana config zip $zipFileName")
+ }
+ }
+}
+
processResources.dependsOn copySwagger
processResources.dependsOn copyGWActions
+processResources.dependsOn copyGrafanaConfig
processResources {
from(new File(project.rootProject.projectDir,
"ansible/files/runtimes.json")) {
@@ -91,6 +107,12 @@ processResources {
include "*.json"
into("couch")
}
+ from(file("$buildDir/tmp/grafana/grafana-config.zip")){
+ into(".")
+ }
+ from(new File(project.rootProject.getProjectDir(),
"core/monitoring/user-events/compose/prometheus/prometheus.yml")){
+ into(".")
+ }
//Implement the logic present in controller Docker file
from(project.swaggerUiDir) {
include "index.html"
diff --git a/core/standalone/src/main/resources/standalone.conf
b/core/standalone/src/main/resources/standalone.conf
index 4d3463b..ffd7316 100644
--- a/core/standalone/src/main/resources/standalone.conf
+++ b/core/standalone/src/main/resources/standalone.conf
@@ -114,8 +114,15 @@ whisk {
"logCleanup_design_document_for_activations_db.json"
]
}
+
+ user-events {
+ image = "openwhisk/user-events:nightly"
+ prometheus-image = "prom/prometheus:v2.5.0"
+ grafana-image = "grafana/grafana:6.1.6"
+ }
}
apache-client {
retry-no-http-response-exception = true
}
+
}
diff --git
a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/StandaloneOpenWhisk.scala
b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/StandaloneOpenWhisk.scala
index 093c37e..854f8c6 100644
---
a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/StandaloneOpenWhisk.scala
+++
b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/StandaloneOpenWhisk.scala
@@ -94,6 +94,8 @@ class Conf(arguments: Seq[String]) extends
ScallopConf(arguments) {
noshort = true,
required = false)
+ val userEvents = opt[Boolean](descr = "Enable User Events along with
Prometheus and Grafana", noshort = true)
+
verify()
val colorEnabled = !disableColorLogging()
@@ -108,6 +110,7 @@ object StandaloneConfigKeys {
val redisConfigKey = "whisk.standalone.redis"
val apiGwConfigKey = "whisk.standalone.api-gateway"
val couchDBConfigKey = "whisk.standalone.couchdb"
+ val userEventConfigKey = "whisk.standalone.user-events"
}
object StandaloneOpenWhisk extends SLF4JLogging {
@@ -175,19 +178,27 @@ object StandaloneOpenWhisk extends SLF4JLogging {
val (dataDir, workDir) = initializeDirs(conf)
val (dockerClient, dockerSupport) = prepareDocker(conf)
+ val defaultSvcs = Seq(
+ ServiceContainer(
+ conf.port(),
+ s"http://${StandaloneDockerSupport.getLocalHostName()}:${conf.port()}",
+ "Controller"))
+
val (apiGwApiPort, apiGwSvcs) = if (conf.apiGw()) {
startApiGateway(conf, dockerClient, dockerSupport)
} else (-1, Seq.empty)
- val (kafkaPort, kafkaSvcs) = if (conf.kafka()) {
+ val (kafkaDockerPort, kafkaSvcs) = if (conf.kafka() || conf.userEvents()) {
startKafka(workDir, dockerClient, conf, conf.kafkaUi())
} else (-1, Seq.empty)
val couchSvcs = if (conf.couchdb()) Some(startCouchDb(dataDir,
dockerClient)) else None
- val svcs = Seq(apiGwSvcs, couchSvcs.toList, kafkaSvcs).flatten
- if (svcs.nonEmpty) {
- new ServiceInfoLogger(conf, svcs, dataDir).run()
- }
+ val userEventSvcs =
+ if (conf.userEvents()) startUserEvents(conf.port(), kafkaDockerPort,
workDir, dataDir, dockerClient)
+ else Seq.empty
+
+ val svcs = Seq(defaultSvcs, apiGwSvcs, couchSvcs.toList, kafkaSvcs,
userEventSvcs).flatten
+ new ServiceInfoLogger(conf, svcs, dataDir).run()
startServer(conf)
new ServerStartupCheck(conf.serverUrl, "OpenWhisk").waitForServerToStart()
@@ -433,12 +444,13 @@ object StandaloneOpenWhisk extends SLF4JLogging {
as: ActorSystem,
ec: ExecutionContext,
materializer: ActorMaterializer): (Int, Seq[ServiceContainer]) = {
- val kafkaPort = getPort(conf.kafkaPort.toOption, preferredKafkaPort)
implicit val tid: TransactionId = TransactionId(systemPrefix + "kafka")
+ val kafkaPort = getPort(conf.kafkaPort.toOption, preferredKafkaPort)
+ val kafkaDockerPort = getPort(conf.kafkaDockerPort.toOption,
preferredKafkaDockerPort)
val k = new KafkaLauncher(
dockerClient,
kafkaPort,
- getPort(conf.kafkaDockerPort.toOption, preferredKafkaDockerPort),
+ kafkaDockerPort,
getPort(conf.zkPort.toOption, preferredZkPort),
workDir,
kafkaUi)
@@ -457,7 +469,29 @@ object StandaloneOpenWhisk extends SLF4JLogging {
setConfigProp(WhiskConfig.kafkaHostList, s"localhost:$kafkaPort")
setSysProp("whisk.spi.MessagingProvider",
"org.apache.openwhisk.connector.kafka.KafkaMessagingProvider")
setSysProp("whisk.spi.LoadBalancerProvider",
"org.apache.openwhisk.standalone.KafkaAwareLeanBalancer")
- (kafkaPort, services)
+ (kafkaDockerPort, services)
+ }
+
+ private def startUserEvents(owPort: Int,
+ kafkaDockerPort: Int,
+ workDir: File,
+ dataDir: File,
+ dockerClient: StandaloneDockerClient)(
+ implicit logging: Logging,
+ as: ActorSystem,
+ ec: ExecutionContext,
+ materializer: ActorMaterializer): Seq[ServiceContainer] = {
+ implicit val tid: TransactionId = TransactionId(systemPrefix +
"userevents")
+ val k = new UserEventLauncher(dockerClient, owPort, kafkaDockerPort,
workDir, dataDir)
+
+ val f = k.run()
+ val g = f.andThen {
+ case Success(_) =>
+ logging.info(this, s"User events started successfully")
+ case Failure(t) =>
+ logging.error(this, "Error starting Kafka" + t)
+ }
+ Await.result(g, 5.minutes)
}
private def getPort(configured: Option[Int], preferred: Int): Int = {
diff --git
a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/Unzip.scala
b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/Unzip.scala
new file mode 100644
index 0000000..ea5136d
--- /dev/null
+++ b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/Unzip.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.standalone
+
+import java.io.{File, FileOutputStream, InputStream}
+import java.util.zip.ZipInputStream
+
+object Unzip {
+
+ def apply(is: InputStream, dir: File): Unit = {
+ //Based on https://stackoverflow.com/a/40547896/1035417
+ val zis = new ZipInputStream((is))
+ val dest = dir.toPath
+ Stream.continually(zis.getNextEntry).takeWhile(_ != null).foreach {
zipEntry =>
+ if (!zipEntry.isDirectory) {
+ val outPath = dest.resolve(zipEntry.getName)
+ val outPathParent = outPath.getParent
+ if (!outPathParent.toFile.exists()) {
+ outPathParent.toFile.mkdirs()
+ }
+
+ val outFile = outPath.toFile
+ val out = new FileOutputStream(outFile)
+ val buffer = new Array[Byte](4096)
+ Stream.continually(zis.read(buffer)).takeWhile(_ !=
-1).foreach(out.write(buffer, 0, _))
+ out.close()
+ }
+ }
+ zis.close()
+ }
+
+}
diff --git
a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/UserEventLauncher.scala
b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/UserEventLauncher.scala
new file mode 100644
index 0000000..cbfc04b
--- /dev/null
+++
b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/UserEventLauncher.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.standalone
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import
org.apache.openwhisk.standalone.StandaloneDockerSupport.{checkOrAllocatePort,
containerName, createRunCmd}
+import pureconfig.loadConfigOrThrow
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class UserEventLauncher(docker: StandaloneDockerClient, owPort: Int,
kafkaDockerPort: Int, workDir: File, dataDir: File)(
+ implicit logging: Logging,
+ ec: ExecutionContext,
+ actorSystem: ActorSystem,
+ materializer: ActorMaterializer,
+ tid: TransactionId) {
+
+ //owPort+1 is used by Api Gateway
+ private val userEventPort = checkOrAllocatePort(owPort + 2)
+ private val prometheusPort = checkOrAllocatePort(9090)
+ private val grafanaPort = checkOrAllocatePort(3000)
+
+ case class UserEventConfig(image: String, prometheusImage: String,
grafanaImage: String)
+
+ private val userEventConfig =
loadConfigOrThrow[UserEventConfig](StandaloneConfigKeys.userEventConfigKey)
+
+ private val hostIp = StandaloneDockerSupport.getLocalHostIp()
+
+ def run(): Future[Seq[ServiceContainer]] = {
+ for {
+ userEvent <- runUserEvents()
+ (promContainer, promSvc) <- runPrometheus()
+ grafanaSvc <- runGrafana(promContainer)
+ } yield {
+ logging.info(this, "Enabled the user-event config")
+ System.setProperty("whisk.user-events.enabled", "true")
+ Seq(userEvent, promSvc, grafanaSvc)
+ }
+ }
+
+ def runUserEvents(): Future[ServiceContainer] = {
+ val env = Map("KAFKA_HOSTS" -> s"$hostIp:$kafkaDockerPort")
+
+ logging.info(this, s"Starting User Events: $userEventPort")
+ val name = containerName("user-events")
+ val params = Map("-p" -> Set(s"$userEventPort:9095"))
+ val args = createRunCmd(name, env, params)
+
+ val f = docker.runDetached(userEventConfig.image, args, true)
+ f.map(_ => ServiceContainer(userEventPort,
s"http://localhost:$userEventPort", name))
+ }
+
+ def runPrometheus(): Future[(StandaloneDockerContainer, ServiceContainer)] =
{
+ logging.info(this, s"Starting Prometheus at $prometheusPort")
+ val baseParams = Map("-p" -> Set(s"$prometheusPort:9090"))
+ val promConfigDir = newDir(workDir, "prometheus")
+ val promDataDir = newDir(dataDir, "prometheus")
+
+ val configFile = new File(promConfigDir, "prometheus.yml")
+ FileUtils.write(configFile, prometheusConfig, UTF_8)
+
+ val volParams = Map(
+ "-v" -> Set(s"${promDataDir.getAbsolutePath}:/prometheus",
s"${promConfigDir.getAbsolutePath}:/etc/prometheus/"))
+ val name = containerName("prometheus")
+ val args = createRunCmd(name, Map.empty, baseParams ++ volParams)
+ val f = docker.runDetached(userEventConfig.prometheusImage, args,
shouldPull = true)
+ val sc = ServiceContainer(prometheusPort,
s"http://localhost:$prometheusPort", name)
+ f.map(c => (c, sc))
+ }
+
+ def runGrafana(promContainer: StandaloneDockerContainer):
Future[ServiceContainer] = {
+ logging.info(this, s"Starting Grafana at $grafanaPort")
+ val baseParams = Map("-p" -> Set(s"$grafanaPort:3000"))
+ val grafanaConfigDir = newDir(workDir, "grafana")
+ val grafanaDataDir = newDir(dataDir, "grafana")
+
+ val promUrl = s"http://$hostIp:$prometheusPort"
+ unzipGrafanaConfig(grafanaConfigDir, promUrl)
+
+ val env = Map(
+ "GF_PATHS_PROVISIONING" -> "/etc/grafana/provisioning",
+ "GF_USERS_ALLOW_SIGN_UP" -> "false",
+ "GF_AUTH_ANONYMOUS_ENABLED" -> "true",
+ "GF_AUTH_ANONYMOUS_ORG_NAME" -> "Main Org.",
+ "GF_AUTH_ANONYMOUS_ORG_ROLE" -> "Admin")
+
+ val volParams = Map(
+ "-v" -> Set(
+ s"${grafanaDataDir.getAbsolutePath}:/var/lib/grafanas",
+
s"${grafanaConfigDir.getAbsolutePath}/provisioning/:/etc/grafana/provisioning/",
+
s"${grafanaConfigDir.getAbsolutePath}/dashboards/:/var/lib/grafana/dashboards/"))
+ val name = containerName("grafana")
+ val args = createRunCmd(name, env, baseParams ++ volParams)
+ val f = docker.runDetached(userEventConfig.grafanaImage, args, shouldPull
= true)
+ val sc = ServiceContainer(grafanaPort, s"http://localhost:$grafanaPort",
name)
+ f.map(_ => sc)
+ }
+
+ private def prometheusConfig = {
+ val config = IOUtils.resourceToString("/prometheus.yml", UTF_8)
+ val pattern = "'user-events:9095'"
+ require(config.contains(pattern), s"Did not found expected pattern
$pattern in prometheus config $config")
+
+ val targets = s"'$hostIp:$userEventPort', '$hostIp:$owPort'"
+ config.replace(pattern, targets)
+ }
+
+ private def unzipGrafanaConfig(configDir: File, promUrl: String): Unit = {
+ val is = getClass.getResourceAsStream("/grafana-config.zip")
+ if (is != null) {
+ Unzip(is, configDir)
+ val configFile = new File(configDir,
"provisioning/datasources/datasource.yml")
+ val config = FileUtils.readFileToString(configFile, UTF_8)
+ val updatedConfig = config.replace("http://prometheus:9090", promUrl)
+ FileUtils.write(configFile, updatedConfig, UTF_8)
+ } else {
+ logging.warn(
+ this,
+ "Did not found the grafana-config.zip in classpath. Make sure its
packaged and present in classpath")
+ }
+ }
+
+ private def newDir(baseDir: File, name: String) = {
+ val dir = new File(baseDir, name)
+ FileUtils.forceMkdir(dir)
+ dir
+ }
+}