This is an automated email from the ASF dual-hosted git repository.

neerajmangal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 39b746f  Recording metrics for action result response statusCode in 
user-events service.  (#4726)
39b746f is described below

commit 39b746f08ad872016b81e7d4950f900f7231d11b
Author: Neeraj Mangal <[email protected]>
AuthorDate: Sat Feb 22 22:39:49 2020 +0530

    Recording metrics for action result response statusCode in user-events 
service.  (#4726)
    
    * add error status in activation event
    
    * Readme changes and Tests
    
    * capture statusCode in response instead of error only
    
    * Fix scalafmt issues
    
    * fix scalafmt errors
    
    * Fix tests
    
    * Fix tests
    
    * incorporate review feedback
    
    * Enhance tests
    
    * Fix Merge issues due to kamon version refine->withTag/s
---
 .../apache/openwhisk/core/connector/Message.scala  | 34 ++++++++++--
 .../core/monitoring/metrics/KamonRecorder.scala    |  8 +++
 .../core/monitoring/metrics/MetricNames.scala      |  2 +
 .../monitoring/metrics/PrometheusRecorder.scala    | 11 ++++
 docs/metrics.md                                    |  1 +
 .../core/connector/test/EventMessageTests.scala    | 38 +++++++++++++
 .../core/entity/test/ActivationCompatTests.scala   | 63 ++++++++++++++++++++++
 7 files changed, 152 insertions(+), 5 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index ecb812c..fdd024d 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -17,13 +17,17 @@
 
 package org.apache.openwhisk.core.connector
 
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 import spray.json._
 import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.core.entity._
+
 import scala.concurrent.duration._
+import akka.http.scaladsl.model.StatusCodes._
 import java.util.concurrent.TimeUnit
-import org.apache.openwhisk.core.entity.ActivationResponse.statusForCode
+
+import org.apache.openwhisk.core.entity.ActivationResponse.{statusForCode, 
ERROR_FIELD}
+import org.apache.openwhisk.utils.JsHelpers
 
 /** Basic trait for messages that are sent on a message bus connector. */
 trait Message {
@@ -293,7 +297,8 @@ case class Activation(name: String,
                       conductor: Boolean,
                       memory: Int,
                       causedBy: Option[String],
-                      size: Option[Int] = None)
+                      size: Option[Int] = None,
+                      userDefinedStatusCode: Option[Int] = None)
     extends EventMessageBody {
   val typeName = Activation.typeName
   override def serialize = toJson.compactPrint
@@ -314,6 +319,7 @@ case class Activation(name: String,
 object Activation extends DefaultJsonProtocol {
 
   val typeName = "Activation"
+
   def parse(msg: String) = Try(activationFormat.read(msg.parseJson))
 
   private implicit val durationFormat = new RootJsonFormat[Duration] {
@@ -340,7 +346,24 @@ object Activation extends DefaultJsonProtocol {
       "conductor",
       "memory",
       "causedBy",
-      "size")
+      "size",
+      "userDefinedStatusCode")
+
+  /** Get "StatusCode" from result response set by action developer **/
+  def userDefinedStatusCode(result: Option[JsValue]): Option[Int] = {
+    val statusCode = JsHelpers
+      .getFieldPath(result.get.asJsObject, ERROR_FIELD, "statusCode")
+      .orElse(JsHelpers.getFieldPath(result.get.asJsObject, "statusCode"))
+
+    statusCode match {
+      case Some(value) =>
+        Try { value.convertTo[BigInt].intValue() } match {
+          case Failure(_)    => Some(BadRequest.intValue)
+          case Success(code) => Some(code)
+        }
+      case None => None
+    }
+  }
 
   /** Constructs an "Activation" event from a WhiskActivation */
   def from(a: WhiskActivation): Try[Activation] = {
@@ -363,7 +386,8 @@ object Activation extends DefaultJsonProtocol {
           .map(_.memory.megabytes)
           .getOrElse(0),
         
a.annotations.getAs[String](WhiskActivation.causedByAnnotation).toOption,
-        a.response.size)
+        a.response.size,
+        userDefinedStatusCode(a.response.result))
     }
   }
 
diff --git 
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
 
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
index f371cc5..6301349 100644
--- 
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
+++ 
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
@@ -35,6 +35,7 @@ trait KamonMetricNames extends MetricNames {
   val durationMetric = "openwhisk.action.duration"
   val responseSizeMetric = "openwhisk.action.responseSize"
   val statusMetric = "openwhisk.action.status"
+  val userDefinedStatusCodeMetric = "openwhisk.action.statusCode"
 
   val concurrentLimitMetric = "openwhisk.action.limit.concurrent"
   val timedLimitMetric = "openwhisk.action.limit.timed"
@@ -103,6 +104,7 @@ object KamonRecorder extends MetricRecorder with 
KamonMetricNames with SLF4JLogg
     private val initTime = Kamon.histogram(initTimeMetric, 
MeasurementUnit.time.milliseconds).withTags(tags)
     private val duration = Kamon.histogram(durationMetric, 
MeasurementUnit.time.milliseconds).withTags(tags)
     private val responseSize = Kamon.histogram(responseSizeMetric, 
MeasurementUnit.information.bytes).withTags(tags)
+    private val userDefinedStatusCode = 
Kamon.counter(userDefinedStatusCodeMetric).withTags(tags)
 
     def record(a: Activation, metricConfig: MetricConfig): Unit = {
       namespaceActivations.increment()
@@ -128,6 +130,12 @@ object KamonRecorder extends MetricRecorder with 
KamonMetricNames with SLF4JLogg
       Kamon.counter(statusMetric).withTags(tags.withTag("status", 
a.status)).increment()
 
       a.size.foreach(responseSize.record(_))
+      a.userDefinedStatusCode.foreach(
+        value =>
+          Kamon
+            .counter(userDefinedStatusCodeMetric)
+            .withTags(tags.withTag("userDefinedStatusCode", value.toString))
+            .increment())
     }
   }
 }
diff --git 
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/MetricNames.scala
 
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/MetricNames.scala
index 60d6d51..f646489 100644
--- 
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/MetricNames.scala
+++ 
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/MetricNames.scala
@@ -24,6 +24,7 @@ trait MetricNames {
   val actionStatus = "status"
   val actionMemory = "memory"
   val actionKind = "kind"
+  val userDefinedStatusCode = "status_code"
 
   def activationMetric: String
   def coldStartMetric: String
@@ -32,6 +33,7 @@ trait MetricNames {
   def durationMetric: String
   def statusMetric: String
   def responseSizeMetric: String
+  def userDefinedStatusCodeMetric: String
 
   def concurrentLimitMetric: String
   def timedLimitMetric: String
diff --git 
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
 
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
index 9cf7a22..c498c6f 100644
--- 
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
+++ 
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
@@ -46,6 +46,7 @@ trait PrometheusMetricNames extends MetricNames {
   val responseSizeMetric = "openwhisk_action_response_size_bytes"
   val statusMetric = "openwhisk_action_status"
   val memoryMetric = "openwhisk_action_memory"
+  val userDefinedStatusCodeMetric = "openwhisk_action_status_code"
 
   val concurrentLimitMetric = "openwhisk_action_limit_concurrent_total"
   val timedLimitMetric = "openwhisk_action_limit_timed_total"
@@ -153,6 +154,8 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
       }
 
       a.size.foreach(responseSize.observe(_))
+      a.userDefinedStatusCode.foreach(value =>
+        userDefinedStatusCodeCounter.labels(namespace, initiator, action, 
value.toString).inc())
     }
   }
 
@@ -210,6 +213,14 @@ object PrometheusRecorder extends PrometheusMetricNames {
       initiatorNamespace,
       actionName,
       actionStatus)
+  private val userDefinedStatusCodeCounter =
+    counter(
+      userDefinedStatusCodeMetric,
+      "status code returned in action result response set by developer",
+      actionNamespace,
+      initiatorNamespace,
+      actionName,
+      userDefinedStatusCode)
   private val waitTimeHisto =
     histogram(waitTimeMetric, "Internal system hold time", actionNamespace, 
initiatorNamespace, actionName)
   private val initTimeHisto =
diff --git a/docs/metrics.md b/docs/metrics.md
index 0c94b89..8bb3ddc 100644
--- a/docs/metrics.md
+++ b/docs/metrics.md
@@ -315,6 +315,7 @@ conductor - true for conductor backed actions
 memory - maximum memory allowed for action container
 causedBy - contains the "causedBy" annotation (can be "sequence" or nothing at 
the moment)
 size - size (in bytes) of the invocation response
+userDefinedStatusCode - status code represents `statusCode` set in result 
response. (if not set, this field will not be present)
 ```
 Metric is any user specific event produced by the system and it at this moment 
includes the following information:
 ```
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/connector/test/EventMessageTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/connector/test/EventMessageTests.scala
index 15653e4..56c4396 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/connector/test/EventMessageTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/connector/test/EventMessageTests.scala
@@ -90,5 +90,43 @@ class EventMessageTests extends FlatSpec with Matchers {
       Activation("ns2/a", 0, toDuration(0), toDuration(0), toDuration(0), 
"testkind", false, 0, None, Some(42)))
   }
 
+  it should "Transform a activation with status code" in {
+    val resultWithError =
+      """
+        |{
+        | "statusCode" : 404,
+        | "body": "Requested resource not found"
+        |}
+        |""".stripMargin.parseJson
+    val a =
+      fullActivation
+        .copy(response = ActivationResponse.applicationError(resultWithError, 
Some(42)))
+    Activation.from(a).map(act => act.userDefinedStatusCode) shouldBe 
Success(Some(404))
+  }
+
+  it should "Transform a activation with error status code" in {
+    val resultWithError =
+      """
+        |{
+        | "error": {
+        |   "statusCode" : "404",
+        |   "body": "Requested resource not found"
+        | }
+        |}
+        |""".stripMargin.parseJson
+    Activation.userDefinedStatusCode(Some(resultWithError)) shouldBe Some(404)
+  }
+
+  it should "Transform a activation with error status code with invalid error 
code" in {
+    val resultWithInvalidError =
+      """
+        |{
+        |   "statusCode" : "i404",
+        |   "body": "Requested resource not found"
+        |}
+        |""".stripMargin.parseJson
+    Activation.userDefinedStatusCode(Some(resultWithInvalidError)) shouldBe 
Some(400)
+  }
+
   def toDuration(milliseconds: Long) = new FiniteDuration(milliseconds, 
TimeUnit.MILLISECONDS)
 }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ActivationCompatTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ActivationCompatTests.scala
index de14a2e..659a604 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ActivationCompatTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ActivationCompatTests.scala
@@ -96,6 +96,52 @@ class ActivationCompatTests extends FlatSpec with Matchers 
with WhiskInstants wi
     |  "version": "0.0.1"
     |}""".stripMargin.parseJson
 
+  val whiskActivationErrorJs = """
+    |{
+    |  "activationId": "be97c2fed5dc43d097c2fed5dc73d085",
+    |  "annotations": [{
+    |    "key": "causedBy",
+    |    "value": "sequence"
+    |  }, {
+    |    "key": "path",
+    |    "value": "ns2/a"
+    |  }, {
+    |    "key": "waitTime",
+    |    "value": 5
+    |  }, {
+    |    "key": "kind",
+    |    "value": "testkind"
+    |  }, {
+    |    "key": "limits",
+    |    "value": {
+    |      "concurrency": 1,
+    |      "memory": 128,
+    |      "timeout": 1000
+    |    }
+    |  }, {
+    |    "key": "initTime",
+    |    "value": 10
+    |  }],
+    |  "duration": 123,
+    |  "end": 1570013740007,
+    |  "logs": [],
+    |  "name": "a",
+    |  "namespace": "ns",
+    |  "publish": false,
+    |  "response": {
+    |    "result": {
+    |      "error": {
+    |         "statusCode": 404,
+    |         "body": "Requested resource not found"
+    |      }
+    |    },
+    |    "statusCode": 0
+    |  },
+    |  "start": 1570013740005,
+    |  "subject": "anon-HfJWZZSG9YE38Y8DJbgp9Xn0YyN",
+    |  "version": "0.0.1"
+    |}""".stripMargin.parseJson
+
   val activationJs = """
      |{
      |  "causedBy": "sequence",
@@ -109,10 +155,27 @@ class ActivationCompatTests extends FlatSpec with 
Matchers with WhiskInstants wi
      |  "waitTime": 5
      |}""".stripMargin.parseJson
 
+  val activationWithActionStatusCodeJs =
+    """
+      |{
+      |  "userDefinedStatusCode": 404,
+      |  "causedBy": "sequence",
+      |  "conductor": false,
+      |  "duration": 123,
+      |  "initTime": 10,
+      |  "kind": "testkind",
+      |  "memory": 128,
+      |  "name": "ns2/a",
+      |  "statusCode": 0,
+      |  "waitTime": 5
+      |}""".stripMargin.parseJson
+
   it should "deserialize without error" in {
     val activationResponse = 
ActivationResponse.serdes.read(activationResponseJs)
     val whiskActivation = WhiskActivation.serdes.read(whiskActivationJs)
     val activation = Activation.activationFormat.read(activationJs)
+    val whiskActivationWithError = 
WhiskActivation.serdes.read(whiskActivationErrorJs)
+    val activationWithActionStatus = 
Activation.activationFormat.read(activationWithActionStatusCodeJs)
   }
 
   def generateJsons(): Unit = {

Reply via email to