This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new a47cd42  Emit user-faced metrics and events to Kafka. (#3552)
a47cd42 is described below

commit a47cd423ffcf695496465383750f0a0b9db29b1f
Author: Vadim Raskin <raskinva...@gmail.com>
AuthorDate: Mon Apr 30 13:09:58 2018 +0200

    Emit user-faced metrics and events to Kafka. (#3552)
    
    This introduces a new kafka topic called events, which will accept events 
of 2 types: Activations and Metrics. First corresponds to the metadata which is 
collected after an activation is finished (initTime, waitTime, responseCode, 
kind, etc), second stands for user related metrics of any kind 
(throttledActivations, concurrentActivations). The data is not aggregated, it 
is sent to Kafka instantly.
---
 ansible/group_vars/all                             |  1 +
 ansible/roles/controller/tasks/deploy.yml          |  1 +
 ansible/roles/invoker/tasks/deploy.yml             |  1 +
 common/scala/src/main/resources/application.conf   | 12 ++-
 .../src/main/scala/whisk/common/UserEvents.scala   | 35 ++++++++
 .../src/main/scala/whisk/core/WhiskConfig.scala    |  1 +
 .../main/scala/whisk/core/connector/Message.scala  | 90 +++++++++++++++++++--
 .../scala/whisk/core/controller/Controller.scala   |  7 +-
 .../core/entitlement/ActivationThrottler.scala     |  3 +
 .../scala/whisk/core/entitlement/Entitlement.scala | 41 ++++++++--
 .../whisk/core/entitlement/LocalEntitlement.scala  | 13 ++-
 .../whisk/core/containerpool/ContainerProxy.scala  |  8 +-
 .../scala/whisk/core/invoker/InvokerReactive.scala | 35 ++++++--
 docs/metrics.md                                    | 47 ++++++++++-
 tests/src/test/resources/application.conf.j2       |  3 +
 .../test/scala/whisk/common/UserEventTests.scala   | 94 ++++++++++++++++++++++
 .../containerpool/test/ContainerProxyTests.scala   | 44 ++++++----
 .../controller/test/ControllerTestCommon.scala     |  5 +-
 .../core/controller/test/WebActionsApiTests.scala  |  2 +-
 19 files changed, 392 insertions(+), 51 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 0bff876..f04807b 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -324,3 +324,4 @@ metrics:
     host: "{{ metrics_kamon_statsd_host | default('') }}"
     port: "{{ metrics_kamon_statsd_port | default('8125') }}"
 
+user_events: "{{ user_events_enabled | default(false) }}"
diff --git a/ansible/roles/controller/tasks/deploy.yml 
b/ansible/roles/controller/tasks/deploy.yml
index 3f2b609..8173f00 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -151,6 +151,7 @@
       "CONFIG_whisk_db_actionsDdoc": "{{ db_whisk_actions_ddoc | default() }}"
       "CONFIG_whisk_db_activationsDdoc": "{{ db_whisk_activations_ddoc | 
default() }}"
       "CONFIG_whisk_db_activationsFilterDdoc": "{{ 
db_whisk_activations_filter_ddoc | default() }}"
+      "CONFIG_whisk_userEvents_enabled": "{{ user_events }}"
 
       "LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
       "LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml 
b/ansible/roles/invoker/tasks/deploy.yml
index 5841269..2643b54 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -175,6 +175,7 @@
         -e CONFIG_whisk_kafka_common_sslTruststorePassword='{{ 
kafka.ssl.keystore.password }}'
         -e CONFIG_whisk_kafka_common_sslKeystoreLocation='/conf/{{ 
kafka.ssl.keystore.name }}'
         -e CONFIG_whisk_kafka_common_sslKeystorePassword='{{ 
kafka.ssl.keystore.password }}'
+        -e CONFIG_whisk_userEvents_enabled='{{ user_events }}'
         -e ZOOKEEPER_HOSTS='{{ zookeeper_connect_string }}'
         -e CONFIG_whisk_couchdb_protocol='{{ db_protocol }}'
         -e CONFIG_whisk_couchdb_host='{{ db_host }}'
diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index c12eb02..f665edb 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -50,7 +50,8 @@ kamon {
 }
 
 whisk {
-    # kafka related configuration
+    # kafka related configuration, the complete list of parameters is here:
+    # https://kafka.apache.org/documentation/#brokerconfigs
     kafka {
         replication-factor = 1
 
@@ -98,6 +99,11 @@ whisk {
                 retention-ms      =  172800000
                 max-message-bytes = ${whisk.activation.payload.max}
             }
+            events {
+                segment-bytes   =  536870912
+                retention-bytes = 1073741824
+                retention-ms    = 3600000
+            }
         }
     }
     # db related configuration
@@ -136,6 +142,10 @@ whisk {
         local-image-prefix = "whisk"
     }
 
+    user-events {
+        enabled = false
+    }
+
     activation {
         payload {
             max = 1048576 // 5 m not possible because cross-referenced to 
kafka configurations
diff --git a/common/scala/src/main/scala/whisk/common/UserEvents.scala 
b/common/scala/src/main/scala/whisk/common/UserEvents.scala
new file mode 100644
index 0000000..0bff020
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/common/UserEvents.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.common
+
+import pureconfig.loadConfigOrThrow
+import whisk.core.ConfigKeys
+import whisk.core.connector.{EventMessage, MessageProducer}
+
+object UserEvents {
+
+  case class UserEventsConfig(enabled: Boolean)
+
+  val enabled = 
loadConfigOrThrow[UserEventsConfig](ConfigKeys.userEvents).enabled
+
+  def send(producer: MessageProducer, em: => EventMessage) = {
+    if (enabled) {
+      producer.send("events", em)
+    }
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 1fd0154..231341f 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -220,6 +220,7 @@ object ConfigKeys {
   val logLimit = "whisk.log-limit"
   val activation = "whisk.activation"
   val activationPayload = s"$activation.payload"
+  val userEvents = "whisk.user-events"
 
   val runtimes = "whisk.runtimes"
 
diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala 
b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index 13e0ed6..b17de8f 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -18,15 +18,9 @@
 package whisk.core.connector
 
 import scala.util.Try
-
 import spray.json._
 import whisk.common.TransactionId
-import whisk.core.entity.ActivationId
-import whisk.core.entity.DocRevision
-import whisk.core.entity.FullyQualifiedEntityName
-import whisk.core.entity.Identity
-import whisk.core.entity.InstanceId
-import whisk.core.entity.WhiskActivation
+import whisk.core.entity._
 
 /** Basic trait for messages that are sent on a message bus connector. */
 trait Message {
@@ -122,3 +116,85 @@ object PingMessage extends DefaultJsonProtocol {
   def parse(msg: String) = Try(serdes.read(msg.parseJson))
   implicit val serdes = jsonFormat(PingMessage.apply _, "name")
 }
+
+trait EventMessageBody extends Message {
+  def typeName: String
+}
+
+object EventMessageBody extends DefaultJsonProtocol {
+
+  implicit def format = new JsonFormat[EventMessageBody] {
+    def write(eventMessageBody: EventMessageBody) = eventMessageBody match {
+      case m: Metric     => m.toJson
+      case a: Activation => a.toJson
+    }
+
+    def read(value: JsValue) =
+      if (value.asJsObject.fields.contains("metricName")) {
+        value.convertTo[Metric]
+      } else {
+        value.convertTo[Activation]
+      }
+  }
+}
+
+case class Activation(name: String,
+                      statusCode: Int,
+                      duration: Long,
+                      waitTime: Long,
+                      initTime: Long,
+                      kind: String,
+                      conductor: Boolean,
+                      memory: Int,
+                      causedBy: Boolean)
+    extends EventMessageBody {
+  val typeName = "Activation"
+  override def serialize = toJson.compactPrint
+
+  def toJson = Activation.activationFormat.write(this)
+}
+
+object Activation extends DefaultJsonProtocol {
+  def parse(msg: String) = Try(activationFormat.read(msg.parseJson))
+  implicit val activationFormat =
+    jsonFormat(
+      Activation.apply _,
+      "name",
+      "statusCode",
+      "duration",
+      "waitTime",
+      "initTime",
+      "kind",
+      "conductor",
+      "memory",
+      "causedBy")
+}
+
+case class Metric(metricName: String, metricValue: Long) extends 
EventMessageBody {
+  val typeName = "Metric"
+  override def serialize = toJson.compactPrint
+  def toJson = Metric.metricFormat.write(this).asJsObject
+}
+
+object Metric extends DefaultJsonProtocol {
+  def parse(msg: String) = Try(metricFormat.read(msg.parseJson))
+  implicit val metricFormat = jsonFormat(Metric.apply _, "metricName", 
"metricValue")
+}
+
+case class EventMessage(source: String,
+                        body: EventMessageBody,
+                        subject: Subject,
+                        namespace: String,
+                        userId: UUID,
+                        eventType: String,
+                        timestamp: Long = System.currentTimeMillis())
+    extends Message {
+  override def serialize = EventMessage.format.write(this).compactPrint
+}
+
+object EventMessage extends DefaultJsonProtocol {
+  implicit val format =
+    jsonFormat(EventMessage.apply _, "source", "body", "subject", "namespace", 
"userId", "eventType", "timestamp")
+
+  def parse(msg: String) = format.read(msg.parseJson)
+}
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/Controller.scala 
b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 095a8b6..340df96 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -121,7 +121,8 @@ class Controller(val instance: InstanceId,
     SpiLoader.get[LoadBalancerProvider].loadBalancer(whiskConfig, instance)
   logging.info(this, s"loadbalancer initialized: 
${loadBalancer.getClass.getSimpleName}")(TransactionId.controller)
 
-  private implicit val entitlementProvider = new 
LocalEntitlementProvider(whiskConfig, loadBalancer)
+  private implicit val entitlementProvider =
+    new LocalEntitlementProvider(whiskConfig, loadBalancer, instance)
   private implicit val activationIdFactory = new ActivationIdGenerator {}
   private implicit val logStore = 
SpiLoader.get[LogStoreProvider].logStore(actorSystem)
 
@@ -227,6 +228,10 @@ object Controller {
       abort(s"failure during msgProvider.ensureTopic for topic 
cacheInvalidation")
     }
 
+    if (!msgProvider.ensureTopic(config, topic = "events", topicConfig = 
"events")) {
+      abort(s"failure during msgProvider.ensureTopic for topic events")
+    }
+
     ExecManifest.initialize(config) match {
       case Success(_) =>
         val controller = new Controller(
diff --git 
a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
 
b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index 563e7fe..b99385a 100644
--- 
a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ 
b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -69,14 +69,17 @@ class ActivationThrottler(loadBalancer: LoadBalancer, 
concurrencyLimit: Identity
 sealed trait RateLimit {
   def ok: Boolean
   def errorMsg: String
+  def limitName: String
 }
 
 case class ConcurrentRateLimit(count: Int, allowed: Int) extends RateLimit {
   val ok: Boolean = count < allowed // must have slack for the current 
activation request
   override def errorMsg: String = Messages.tooManyConcurrentRequests(count, 
allowed)
+  val limitName: String = "ConcurrentRateLimit"
 }
 
 case class TimedRateLimit(count: Int, allowed: Int) extends RateLimit {
   val ok: Boolean = count <= allowed // the count is already updated to 
account for the current request
   override def errorMsg: String = Messages.tooManyRequests(count, allowed)
+  val limitName: String = "TimedRateLimit"
 }
diff --git 
a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala 
b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 1aa6e3c..478bda4 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -27,9 +27,10 @@ import akka.http.scaladsl.model.StatusCodes.Forbidden
 import akka.http.scaladsl.model.StatusCodes.TooManyRequests
 import whisk.core.entitlement.Privilege.ACTIVATE
 import whisk.core.entitlement.Privilege.REJECT
-import whisk.common.Logging
-import whisk.common.TransactionId
+import whisk.common.{Logging, TransactionId, UserEvents}
+import whisk.connector.kafka.KafkaMessagingProvider
 import whisk.core.WhiskConfig
+import whisk.core.connector.{EventMessage, Metric}
 import whisk.core.controller.RejectRequest
 import whisk.core.entity._
 import whisk.core.loadBalancer.{LoadBalancer, ShardingContainerPoolBalancer}
@@ -74,9 +75,10 @@ protected[core] object EntitlementProvider {
  * A trait that implements entitlements to resources. It performs checks for 
CRUD and Acivation requests.
  * This is where enforcement of activation quotas takes place, in additional 
to basic authorization.
  */
-protected[core] abstract class EntitlementProvider(config: WhiskConfig, 
loadBalancer: LoadBalancer)(
-  implicit actorSystem: ActorSystem,
-  logging: Logging) {
+protected[core] abstract class EntitlementProvider(
+  config: WhiskConfig,
+  loadBalancer: LoadBalancer,
+  controllerInstance: InstanceId)(implicit actorSystem: ActorSystem, logging: 
Logging) {
 
   private implicit val executionContext: ExecutionContext = 
actorSystem.dispatcher
 
@@ -142,6 +144,8 @@ protected[core] abstract class EntitlementProvider(config: 
WhiskConfig, loadBala
       activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, 
_.limits.concurrentInvocations),
       config.actionInvokeSystemOverloadLimit.toInt)
 
+  private val eventProducer = KafkaMessagingProvider.getProducer(this.config)
+
   /**
    * Grants a subject the right to access a resources.
    *
@@ -358,10 +362,37 @@ protected[core] abstract class 
EntitlementProvider(config: WhiskConfig, loadBala
   private def checkThrottleOverload(throttle: Future[RateLimit], user: 
Identity)(
     implicit transid: TransactionId): Future[Unit] = {
     throttle.flatMap { limit =>
+      val userId = user.authkey.uuid
       if (limit.ok) {
+        limit match {
+          case c: ConcurrentRateLimit => {
+            val metric =
+              Metric("ConcurrentInvocations", c.count + 1)
+            UserEvents.send(
+              eventProducer,
+              EventMessage(
+                s"controller${controllerInstance.instance}",
+                metric,
+                user.subject,
+                user.namespace.toString,
+                userId,
+                metric.typeName))
+          }
+          case _ => // ignore
+        }
         Future.successful(())
       } else {
         logging.info(this, s"'${user.namespace}' has exceeded its throttle 
limit, ${limit.errorMsg}")
+        val metric = Metric(limit.limitName, 1)
+        UserEvents.send(
+          eventProducer,
+          EventMessage(
+            s"controller${controllerInstance.instance}",
+            metric,
+            user.subject,
+            user.namespace.toString,
+            userId,
+            metric.typeName))
         Future.failed(RejectRequest(TooManyRequests, limit.errorMsg))
       }
     }
diff --git 
a/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala 
b/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala
index e9155f7..fa0edf7 100644
--- 
a/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala
+++ 
b/core/controller/src/main/scala/whisk/core/entitlement/LocalEntitlement.scala
@@ -19,13 +19,11 @@ package whisk.core.entitlement
 
 import scala.collection.concurrent.TrieMap
 import scala.concurrent.Future
-
 import akka.actor.ActorSystem
-
 import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
-import whisk.core.entity.Subject
+import whisk.core.entity.{InstanceId, Subject}
 import whisk.core.loadBalancer.LoadBalancer
 
 private object LocalEntitlementProvider {
@@ -34,10 +32,11 @@ private object LocalEntitlementProvider {
   private val matrix = TrieMap[(Subject, String), Set[Privilege]]()
 }
 
-protected[core] class LocalEntitlementProvider(private val config: 
WhiskConfig, private val loadBalancer: LoadBalancer)(
-  implicit actorSystem: ActorSystem,
-  logging: Logging)
-    extends EntitlementProvider(config, loadBalancer) {
+protected[core] class LocalEntitlementProvider(
+  private val config: WhiskConfig,
+  private val loadBalancer: LoadBalancer,
+  private val controllerInstance: InstanceId)(implicit actorSystem: 
ActorSystem, logging: Logging)
+    extends EntitlementProvider(config, loadBalancer, controllerInstance) {
 
   private implicit val executionContext = actorSystem.dispatcher
 
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala 
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index fddafd5..b75ad72 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -95,7 +95,7 @@ case object RescheduleJob // job is sent back to parent and 
could not be process
  */
 class ContainerProxy(
   factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => 
Future[Container],
-  sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId) => 
Future[Any],
+  sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) 
=> Future[Any],
   storeActivation: (TransactionId, WhiskActivation) => Future[Any],
   collectLogs: (TransactionId, Identity, WhiskActivation, Container, 
ExecutableWhiskAction) => Future[ActivationLogs],
   instance: InstanceId,
@@ -161,7 +161,7 @@ class ContainerProxy(
             // implicitly via a FailureMessage which will be processed later 
when the state
             // transitions to Running
             val activation = ContainerProxy.constructWhiskActivation(job, 
None, Interval.zero, response)
-            sendActiveAck(transid, activation, job.msg.blocking, 
job.msg.rootControllerIndex)
+            sendActiveAck(transid, activation, job.msg.blocking, 
job.msg.rootControllerIndex, job.msg.user.authkey.uuid)
             storeActivation(transid, activation)
         }
         .flatMap { container =>
@@ -380,7 +380,7 @@ class ContainerProxy(
       }
 
     // Sending active ack. Entirely asynchronous and not waited upon.
-    activation.foreach(sendActiveAck(tid, _, job.msg.blocking, 
job.msg.rootControllerIndex))
+    activation.foreach(sendActiveAck(tid, _, job.msg.blocking, 
job.msg.rootControllerIndex, job.msg.user.authkey.uuid))
 
     // Adds logs to the raw activation.
     val activationWithLogs: Future[Either[ActivationLogReadingError, 
WhiskActivation]] = activation
@@ -422,7 +422,7 @@ final case class ContainerProxyTimeoutConfig(idleContainer: 
FiniteDuration, paus
 object ContainerProxy {
   def props(
     factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => 
Future[Container],
-    ack: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any],
+    ack: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) => 
Future[Any],
     store: (TransactionId, WhiskActivation) => Future[Any],
     collectLogs: (TransactionId, Identity, WhiskActivation, Container, 
ExecutableWhiskAction) => Future[ActivationLogs],
     instance: InstanceId,
diff --git 
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 8601e4c..b132dd8 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -26,7 +26,7 @@ import akka.stream.ActorMaterializer
 import org.apache.kafka.common.errors.RecordTooLargeException
 import pureconfig._
 import spray.json._
-import whisk.common.{Logging, LoggingMarkers, Scheduler, TransactionId}
+import whisk.common._
 import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.core.connector._
 import whisk.core.containerpool._
@@ -40,6 +40,7 @@ import whisk.spi.SpiLoader
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
+import DefaultJsonProtocol._
 
 class InvokerReactive(
   config: WhiskConfig,
@@ -112,12 +113,12 @@ class InvokerReactive(
   private val ack = (tid: TransactionId,
                      activationResult: WhiskActivation,
                      blockingInvoke: Boolean,
-                     controllerInstance: InstanceId) => {
+                     controllerInstance: InstanceId,
+                     userId: UUID) => {
     implicit val transid: TransactionId = tid
 
     def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = 
false) = {
       val msg = CompletionMessage(transid, res, instance)
-
       producer.send(s"completed${controllerInstance.toInt}", msg).andThen {
         case Success(_) =>
           logging.info(
@@ -125,6 +126,30 @@ class InvokerReactive(
             s"posted ${if (recovery) "recovery" else "completion"} of 
activation ${activationResult.activationId}")
       }
     }
+    // Potentially sends activation metadata to kafka if user events are 
enabled
+    UserEvents.send(
+      producer, {
+        val activation = Activation(
+          activationResult.namespace + EntityPath.PATHSEP + 
activationResult.name,
+          activationResult.response.statusCode,
+          activationResult.duration.getOrElse(0),
+          
activationResult.annotations.getAs[Long](WhiskActivation.waitTimeAnnotation).getOrElse(0),
+          
activationResult.annotations.getAs[Long](WhiskActivation.initTimeAnnotation).getOrElse(0),
+          
activationResult.annotations.getAs[String](WhiskActivation.kindAnnotation).getOrElse("unknown_kind"),
+          
activationResult.annotations.getAs[Boolean](WhiskActivation.conductorAnnotation).getOrElse(false),
+          activationResult.annotations
+            .getAs[ActionLimits](WhiskActivation.limitsAnnotation)
+            .map(al => al.memory.megabytes)
+            .getOrElse(0),
+          
activationResult.annotations.getAs[Boolean](WhiskActivation.causedByAnnotation).getOrElse(false))
+        EventMessage(
+          s"invoker${instance.instance}",
+          activation,
+          activationResult.subject,
+          activationResult.namespace.toString,
+          userId,
+          activation.typeName)
+      })
 
     send(Right(if (blockingInvoke) activationResult else 
activationResult.withoutLogsOrResult)).recoverWith {
       case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
@@ -209,7 +234,7 @@ class InvokerReactive(
 
                 val activation = generateFallbackActivation(msg, response)
                 activationFeed ! MessageFeed.Processed
-                ack(msg.transid, activation, msg.blocking, 
msg.rootControllerIndex)
+                ack(msg.transid, activation, msg.blocking, 
msg.rootControllerIndex, msg.user.authkey.uuid)
                 store(msg.transid, activation)
                 Future.successful(())
             }
@@ -219,7 +244,7 @@ class InvokerReactive(
           activationFeed ! MessageFeed.Processed
           val activation =
             generateFallbackActivation(msg, 
ActivationResponse.applicationError(Messages.namespacesBlacklisted))
-          ack(msg.transid, activation, false, msg.rootControllerIndex)
+          ack(msg.transid, activation, false, msg.rootControllerIndex, 
msg.user.authkey.uuid)
           logging.warn(this, s"namespace ${msg.user.namespace} was blocked in 
invoker.")
           Future.successful(())
         }
diff --git a/docs/metrics.md b/docs/metrics.md
index 9ba60ef..3d2bb5e 100644
--- a/docs/metrics.md
+++ b/docs/metrics.md
@@ -17,9 +17,14 @@
 -->
 # Openwhisk Metric Support
 
-Openwhick contains the capability to send metric information to a statsd 
server. This capability is disabled per default. Instead metric information is 
normally written to the log files in logmarker format.
+Openwhisk distinguishes between system and user metrics (events).
 
-## Configuration
+System metrics typically contain information about system performance and 
provide a possibility to send them to Kamon or write them to log files in 
logmarker format. This metrics are typically used by OpenWhisk 
providers/operators.
+
+User metrics encompass information about action performance which is sent to 
Kafka in a form of events. These metrics are to be consumed by OpenWhisk users, 
however they could be also used for billing or audit purpose. It is to be noted 
that at the moment the events are not directly exposed to the users and require 
an additional Kakfa Consumer based micro-service for data processing.
+
+## System specific metrics
+### Configuration
 
 Both capabilties can be enabled or disabled separately during deployment via 
Ansible configuration in the 'goup_vars/all' file of an  environment.
 
@@ -60,7 +65,7 @@ metrics_kamon_statsd_port: '8125'
 metrics_log: true
 ```
 
-## Testing the statsd metric support
+### Testing the statsd metric support
 
 The Kamon project privides an integrated docker image containing statsd and a 
connected Grafana dashboard via [this Github 
project](https://github.com/kamon-io/docker-grafana-graphite). This image is 
helpful for testing the metrices sent via statsd.
 
@@ -69,3 +74,39 @@ Please follow these 
[instructions](https://github.com/kamon-io/docker-grafana-gr
 The docker image exposes statsd via the (standard) port 8125 and a Graphana 
dashboard via port 8080 on your docker host.
 
 The address of your docker host has to be configured in the 
`metrics_kamon_statsd_host` configuration property.
+
+## User specific metrics
+### Configuration
+User metrics are enabled by default and could be explicitly disabled by 
setting the following property in one of the Ansible configuration files:
+```
+user_events: false
+```
+
+### Supported events
+Activation is an event that occurs after after each activation. It includes 
the following execution metadata:
+```
+waitTime - internal system hold time
+initTime - time it took to initialise an action, e.g. docker init
+statusCode - status code of the invocation: 0 - success, 1 - application 
error, 2 - action developer error, 3 - internal OpenWhisk error
+duration - actual time the action code was running
+kind - action flavor, e.g. nodejs
+conductor - true for conductor backed actions
+memory - maximum memory allowed for action container
+causedBy - true for sequence actions
+```
+Metric is any user specific event produced by the system and it at this moment 
includes the following information:
+```
+ConcurrentRateLimit - a user has exceeded its limit for concurrent invocations.
+TimedRateLimit - the user has reached its per minute limit for the number of 
invocations.
+ConcurrentInvocations - the number of in flight invocations per user.
+```
+
+Example events that could be consumed from Kafka.
+Activation:
+```
+{"body":{"statusCode":0,"duration":3,"name":"whisk.system/invokerHealthTestAction0","waitTime":583915671,"conductor":false,"kind":"nodejs:6","initTime":0,"memory":
 256, "causedBy": 
false},"eventType":"Activation","source":"invoker0","subject":"whisk.system","timestamp":1524476122676,"userId":"d0888ad5-5a92-435e-888a-d55a92935e54","namespace":"whisk.system"}
+```
+Metric:
+```
+{"body":{"metricName":"ConcurrentInvocations","metricValue":1},"eventType":"Metric","source":"controller0","subject":"guest","timestamp":1524476104419,"userId":"23bc46b1-71f6-4ed5-8c54-816aa4f8c502","namespace":"guest"}
+```
diff --git a/tests/src/test/resources/application.conf.j2 
b/tests/src/test/resources/application.conf.j2
index 6054bda..62fa019 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -58,4 +58,7 @@ whisk {
         client-auth = "{{ controller.ssl.clientAuth }}"
       }
     }
+    user-events {
+        enabled = {{ user_events }}
+    }
 }
diff --git a/tests/src/test/scala/whisk/common/UserEventTests.scala 
b/tests/src/test/scala/whisk/common/UserEventTests.scala
new file mode 100644
index 0000000..661b07e
--- /dev/null
+++ b/tests/src/test/scala/whisk/common/UserEventTests.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.common
+
+import java.nio.charset.StandardCharsets
+
+import akka.actor.ActorSystem
+import common._
+import common.rest.WskRest
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import whisk.connector.kafka.KafkaConsumerConnector
+import whisk.core.WhiskConfig
+import whisk.core.connector.{Activation, EventMessage, Metric}
+
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class UserEventTests extends FlatSpec with Matchers with WskTestHelpers with 
StreamLogging with BeforeAndAfterAll {
+
+  implicit val wskprops = WskProps()
+  implicit val system = ActorSystem("UserEventTestSystem")
+  val config = new WhiskConfig(WhiskConfig.kafkaHosts)
+
+  val wsk = new WskRest
+
+  val groupid = "kafkatest"
+  val topic = "events"
+  val maxPollInterval = 10.seconds
+
+  val consumer = new KafkaConsumerConnector(config.kafkaHosts, groupid, topic)
+  val testActionsDir = 
WhiskProperties.getFileRelativeToWhiskHome("tests/dat/actions")
+  behavior of "UserEvents"
+
+  override def afterAll() {
+    consumer.close()
+  }
+
+  if (UserEvents.enabled) {
+    it should "invoke an action and produce user events" in 
withAssetCleaner(wskprops) { (wp, assetHelper) =>
+      val file = Some(TestUtils.getTestActionFilename("hello.js"))
+      val name = "testUserEvents"
+
+      assetHelper.withCleaner(wsk.action, name, confirmDelete = true) { 
(action, _) =>
+        action.create(name, file)
+      }
+
+      val run = wsk.action.invoke(name, blocking = true)
+
+      withActivation(wsk.activation, run) { result =>
+        withClue("invoking an action was unsuccessful") {
+          result.response.status shouldBe "success"
+        }
+      }
+      // checking for any metrics to arrive
+      val received =
+        consumer.peek(maxPollInterval).map {
+          case (_, _, _, msg) => EventMessage.parse(new String(msg, 
StandardCharsets.UTF_8))
+        }
+      received.map(event => {
+        event.body match {
+          case a: Activation =>
+            Seq(a.statusCode) should contain oneOf (0, 1, 2, 3)
+            event.source should fullyMatch regex "invoker\\d+".r
+          case m: Metric =>
+            Seq(m.metricName) should contain oneOf ("ConcurrentInvocations", 
"ConcurrentRateLimit", "TimedRateLimit")
+            event.source should fullyMatch regex "controller\\d+".r
+        }
+      })
+      // produce at least 2 events - an Activation and a 
'ConcurrentInvocations' Metric
+      // >= 2 is due to events that might have potentially occurred in between
+      received.size should be >= 2
+      consumer.commit()
+    }
+
+  }
+
+}
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 0055372..fcde7e7 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -150,7 +150,7 @@ class ContainerProxyTests
 
   /** Creates an inspectable version of the ack method, which records all 
calls in a buffer */
   def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction {
-    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: InstanceId) 
=>
+    (_: TransactionId, activation: WhiskActivation, _: Boolean, _: InstanceId, 
_: UUID) =>
       activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
       activation.annotations.get("path") shouldBe 
Some(a.fullyQualifiedName(false).toString.toJson)
       activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)
@@ -219,7 +219,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), 
poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, 
pauseGrace = timeout))
     registerCallback(machine)
 
     preWarm(machine)
@@ -255,7 +256,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), 
poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, 
pauseGrace = timeout))
     registerCallback(machine)
     preWarm(machine)
 
@@ -302,7 +304,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), 
poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, 
pauseGrace = timeout))
     registerCallback(machine)
     preWarm(machine)
 
@@ -340,7 +343,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), 
poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, 
pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized)
 
@@ -372,7 +376,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), 
poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, 
pauseGrace = timeout))
     registerCallback(machine)
 
     machine ! Run(noLogsAction, message)
@@ -402,7 +407,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), 
poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, 
pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -437,7 +443,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), 
poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, 
pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -476,7 +483,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), 
poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, 
pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -506,7 +514,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), 
poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, 
pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -535,7 +544,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), 
poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, 
pauseGrace = timeout))
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
@@ -568,7 +578,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, createCollector(), 
InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, createCollector(), InstanceId(0), 
poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized) // first run an activation
     timeout(machine) // times out Ready state so container suspends
@@ -603,7 +614,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, createCollector(), 
InstanceId(0), poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, createCollector(), InstanceId(0), 
poolConfig, pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized)
     timeout(machine) // times out Ready state so container suspends
@@ -639,7 +651,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), 
poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, 
pauseGrace = timeout))
     registerCallback(machine)
 
     // Start running the action
@@ -690,7 +703,8 @@ class ContainerProxyTests
 
     val machine =
       childActorOf(
-        ContainerProxy.props(factory, acker, store, collector, InstanceId(0), 
poolConfig, pauseGrace = timeout))
+        ContainerProxy
+          .props(factory, acker, store, collector, InstanceId(0), poolConfig, 
pauseGrace = timeout))
     registerCallback(machine)
     run(machine, Uninitialized)
     timeout(machine)
diff --git 
a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala 
b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index 662c6d7..e9df1d5 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -67,7 +67,7 @@ protected trait ControllerTestCommon
   override implicit val actorSystem = system // defined in ScalatestRouteTest
   override val executionContext = actorSystem.dispatcher
 
-  override val whiskConfig = new WhiskConfig(RestApiCommons.requiredProperties)
+  override val whiskConfig = new WhiskConfig(RestApiCommons.requiredProperties 
++ WhiskConfig.kafkaHosts)
   assert(whiskConfig.isValid)
 
   // initialize runtimes manifest
@@ -75,7 +75,8 @@ protected trait ControllerTestCommon
 
   override val loadBalancer = new DegenerateLoadBalancerService(whiskConfig)
 
-  override lazy val entitlementProvider: EntitlementProvider = new 
LocalEntitlementProvider(whiskConfig, loadBalancer)
+  override lazy val entitlementProvider: EntitlementProvider =
+    new LocalEntitlementProvider(whiskConfig, loadBalancer, instance)
 
   override val activationIdFactory = new ActivationId.ActivationIdGenerator() {
     // need a static activation id to test activations api
diff --git 
a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala 
b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
index 59d4b8e..97d5999 100644
--- a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
@@ -1729,7 +1729,7 @@ trait WebActionsApiBaseTests extends ControllerTestCommon 
with BeforeAndAfterEac
   }
 
   class TestingEntitlementProvider(config: WhiskConfig, loadBalancer: 
LoadBalancer)
-      extends EntitlementProvider(config, loadBalancer) {
+      extends EntitlementProvider(config, loadBalancer, InstanceId(0)) {
 
     protected[core] override def checkThrottles(user: Identity)(implicit 
transid: TransactionId): Future[Unit] = {
       val subject = user.subject

-- 
To stop receiving notification emails like this one, please contact
markusthoem...@apache.org.

Reply via email to