This is an automated email from the ASF dual-hosted git repository.
neerajmangal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 39b746f Recording metrics for action result response statusCode in
user-events service. (#4726)
39b746f is described below
commit 39b746f08ad872016b81e7d4950f900f7231d11b
Author: Neeraj Mangal <[email protected]>
AuthorDate: Sat Feb 22 22:39:49 2020 +0530
Recording metrics for action result response statusCode in user-events
service. (#4726)
* add error status in activation event
* Readme changes and Tests
* capture statusCode in response instead of error only
* Fix scalafmt issues
* fix scalafmt errors
* Fix tests
* Fix tests
* incorporate review feedback
* Enhance tests
* Fix Merge issues due to kamon version refine->withTag/s
---
.../apache/openwhisk/core/connector/Message.scala | 34 ++++++++++--
.../core/monitoring/metrics/KamonRecorder.scala | 8 +++
.../core/monitoring/metrics/MetricNames.scala | 2 +
.../monitoring/metrics/PrometheusRecorder.scala | 11 ++++
docs/metrics.md | 1 +
.../core/connector/test/EventMessageTests.scala | 38 +++++++++++++
.../core/entity/test/ActivationCompatTests.scala | 63 ++++++++++++++++++++++
7 files changed, 152 insertions(+), 5 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
index ecb812c..fdd024d 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
@@ -17,13 +17,17 @@
package org.apache.openwhisk.core.connector
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
import spray.json._
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.entity._
+
import scala.concurrent.duration._
+import akka.http.scaladsl.model.StatusCodes._
import java.util.concurrent.TimeUnit
-import org.apache.openwhisk.core.entity.ActivationResponse.statusForCode
+
+import org.apache.openwhisk.core.entity.ActivationResponse.{statusForCode,
ERROR_FIELD}
+import org.apache.openwhisk.utils.JsHelpers
/** Basic trait for messages that are sent on a message bus connector. */
trait Message {
@@ -293,7 +297,8 @@ case class Activation(name: String,
conductor: Boolean,
memory: Int,
causedBy: Option[String],
- size: Option[Int] = None)
+ size: Option[Int] = None,
+ userDefinedStatusCode: Option[Int] = None)
extends EventMessageBody {
val typeName = Activation.typeName
override def serialize = toJson.compactPrint
@@ -314,6 +319,7 @@ case class Activation(name: String,
object Activation extends DefaultJsonProtocol {
val typeName = "Activation"
+
def parse(msg: String) = Try(activationFormat.read(msg.parseJson))
private implicit val durationFormat = new RootJsonFormat[Duration] {
@@ -340,7 +346,24 @@ object Activation extends DefaultJsonProtocol {
"conductor",
"memory",
"causedBy",
- "size")
+ "size",
+ "userDefinedStatusCode")
+
+ /** Get "StatusCode" from result response set by action developer **/
+ def userDefinedStatusCode(result: Option[JsValue]): Option[Int] = {
+ val statusCode = JsHelpers
+ .getFieldPath(result.get.asJsObject, ERROR_FIELD, "statusCode")
+ .orElse(JsHelpers.getFieldPath(result.get.asJsObject, "statusCode"))
+
+ statusCode match {
+ case Some(value) =>
+ Try { value.convertTo[BigInt].intValue() } match {
+ case Failure(_) => Some(BadRequest.intValue)
+ case Success(code) => Some(code)
+ }
+ case None => None
+ }
+ }
/** Constructs an "Activation" event from a WhiskActivation */
def from(a: WhiskActivation): Try[Activation] = {
@@ -363,7 +386,8 @@ object Activation extends DefaultJsonProtocol {
.map(_.memory.megabytes)
.getOrElse(0),
a.annotations.getAs[String](WhiskActivation.causedByAnnotation).toOption,
- a.response.size)
+ a.response.size,
+ userDefinedStatusCode(a.response.result))
}
}
diff --git
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
index f371cc5..6301349 100644
---
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
+++
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
@@ -35,6 +35,7 @@ trait KamonMetricNames extends MetricNames {
val durationMetric = "openwhisk.action.duration"
val responseSizeMetric = "openwhisk.action.responseSize"
val statusMetric = "openwhisk.action.status"
+ val userDefinedStatusCodeMetric = "openwhisk.action.statusCode"
val concurrentLimitMetric = "openwhisk.action.limit.concurrent"
val timedLimitMetric = "openwhisk.action.limit.timed"
@@ -103,6 +104,7 @@ object KamonRecorder extends MetricRecorder with
KamonMetricNames with SLF4JLogg
private val initTime = Kamon.histogram(initTimeMetric,
MeasurementUnit.time.milliseconds).withTags(tags)
private val duration = Kamon.histogram(durationMetric,
MeasurementUnit.time.milliseconds).withTags(tags)
private val responseSize = Kamon.histogram(responseSizeMetric,
MeasurementUnit.information.bytes).withTags(tags)
+ private val userDefinedStatusCode =
Kamon.counter(userDefinedStatusCodeMetric).withTags(tags)
def record(a: Activation, metricConfig: MetricConfig): Unit = {
namespaceActivations.increment()
@@ -128,6 +130,12 @@ object KamonRecorder extends MetricRecorder with
KamonMetricNames with SLF4JLogg
Kamon.counter(statusMetric).withTags(tags.withTag("status",
a.status)).increment()
a.size.foreach(responseSize.record(_))
+ a.userDefinedStatusCode.foreach(
+ value =>
+ Kamon
+ .counter(userDefinedStatusCodeMetric)
+ .withTags(tags.withTag("userDefinedStatusCode", value.toString))
+ .increment())
}
}
}
diff --git
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/MetricNames.scala
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/MetricNames.scala
index 60d6d51..f646489 100644
---
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/MetricNames.scala
+++
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/MetricNames.scala
@@ -24,6 +24,7 @@ trait MetricNames {
val actionStatus = "status"
val actionMemory = "memory"
val actionKind = "kind"
+ val userDefinedStatusCode = "status_code"
def activationMetric: String
def coldStartMetric: String
@@ -32,6 +33,7 @@ trait MetricNames {
def durationMetric: String
def statusMetric: String
def responseSizeMetric: String
+ def userDefinedStatusCodeMetric: String
def concurrentLimitMetric: String
def timedLimitMetric: String
diff --git
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
index 9cf7a22..c498c6f 100644
---
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
+++
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
@@ -46,6 +46,7 @@ trait PrometheusMetricNames extends MetricNames {
val responseSizeMetric = "openwhisk_action_response_size_bytes"
val statusMetric = "openwhisk_action_status"
val memoryMetric = "openwhisk_action_memory"
+ val userDefinedStatusCodeMetric = "openwhisk_action_status_code"
val concurrentLimitMetric = "openwhisk_action_limit_concurrent_total"
val timedLimitMetric = "openwhisk_action_limit_timed_total"
@@ -153,6 +154,8 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
}
a.size.foreach(responseSize.observe(_))
+ a.userDefinedStatusCode.foreach(value =>
+ userDefinedStatusCodeCounter.labels(namespace, initiator, action,
value.toString).inc())
}
}
@@ -210,6 +213,14 @@ object PrometheusRecorder extends PrometheusMetricNames {
initiatorNamespace,
actionName,
actionStatus)
+ private val userDefinedStatusCodeCounter =
+ counter(
+ userDefinedStatusCodeMetric,
+ "status code returned in action result response set by developer",
+ actionNamespace,
+ initiatorNamespace,
+ actionName,
+ userDefinedStatusCode)
private val waitTimeHisto =
histogram(waitTimeMetric, "Internal system hold time", actionNamespace,
initiatorNamespace, actionName)
private val initTimeHisto =
diff --git a/docs/metrics.md b/docs/metrics.md
index 0c94b89..8bb3ddc 100644
--- a/docs/metrics.md
+++ b/docs/metrics.md
@@ -315,6 +315,7 @@ conductor - true for conductor backed actions
memory - maximum memory allowed for action container
causedBy - contains the "causedBy" annotation (can be "sequence" or nothing at
the moment)
size - size (in bytes) of the invocation response
+userDefinedStatusCode - status code represents `statusCode` set in result
response. (if not set, this field will not be present)
```
Metric is any user specific event produced by the system and it at this moment
includes the following information:
```
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/connector/test/EventMessageTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/connector/test/EventMessageTests.scala
index 15653e4..56c4396 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/connector/test/EventMessageTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/connector/test/EventMessageTests.scala
@@ -90,5 +90,43 @@ class EventMessageTests extends FlatSpec with Matchers {
Activation("ns2/a", 0, toDuration(0), toDuration(0), toDuration(0),
"testkind", false, 0, None, Some(42)))
}
+ it should "Transform a activation with status code" in {
+ val resultWithError =
+ """
+ |{
+ | "statusCode" : 404,
+ | "body": "Requested resource not found"
+ |}
+ |""".stripMargin.parseJson
+ val a =
+ fullActivation
+ .copy(response = ActivationResponse.applicationError(resultWithError,
Some(42)))
+ Activation.from(a).map(act => act.userDefinedStatusCode) shouldBe
Success(Some(404))
+ }
+
+ it should "Transform a activation with error status code" in {
+ val resultWithError =
+ """
+ |{
+ | "error": {
+ | "statusCode" : "404",
+ | "body": "Requested resource not found"
+ | }
+ |}
+ |""".stripMargin.parseJson
+ Activation.userDefinedStatusCode(Some(resultWithError)) shouldBe Some(404)
+ }
+
+ it should "Transform a activation with error status code with invalid error
code" in {
+ val resultWithInvalidError =
+ """
+ |{
+ | "statusCode" : "i404",
+ | "body": "Requested resource not found"
+ |}
+ |""".stripMargin.parseJson
+ Activation.userDefinedStatusCode(Some(resultWithInvalidError)) shouldBe
Some(400)
+ }
+
def toDuration(milliseconds: Long) = new FiniteDuration(milliseconds,
TimeUnit.MILLISECONDS)
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ActivationCompatTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ActivationCompatTests.scala
index de14a2e..659a604 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ActivationCompatTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ActivationCompatTests.scala
@@ -96,6 +96,52 @@ class ActivationCompatTests extends FlatSpec with Matchers
with WhiskInstants wi
| "version": "0.0.1"
|}""".stripMargin.parseJson
+ val whiskActivationErrorJs = """
+ |{
+ | "activationId": "be97c2fed5dc43d097c2fed5dc73d085",
+ | "annotations": [{
+ | "key": "causedBy",
+ | "value": "sequence"
+ | }, {
+ | "key": "path",
+ | "value": "ns2/a"
+ | }, {
+ | "key": "waitTime",
+ | "value": 5
+ | }, {
+ | "key": "kind",
+ | "value": "testkind"
+ | }, {
+ | "key": "limits",
+ | "value": {
+ | "concurrency": 1,
+ | "memory": 128,
+ | "timeout": 1000
+ | }
+ | }, {
+ | "key": "initTime",
+ | "value": 10
+ | }],
+ | "duration": 123,
+ | "end": 1570013740007,
+ | "logs": [],
+ | "name": "a",
+ | "namespace": "ns",
+ | "publish": false,
+ | "response": {
+ | "result": {
+ | "error": {
+ | "statusCode": 404,
+ | "body": "Requested resource not found"
+ | }
+ | },
+ | "statusCode": 0
+ | },
+ | "start": 1570013740005,
+ | "subject": "anon-HfJWZZSG9YE38Y8DJbgp9Xn0YyN",
+ | "version": "0.0.1"
+ |}""".stripMargin.parseJson
+
val activationJs = """
|{
| "causedBy": "sequence",
@@ -109,10 +155,27 @@ class ActivationCompatTests extends FlatSpec with
Matchers with WhiskInstants wi
| "waitTime": 5
|}""".stripMargin.parseJson
+ val activationWithActionStatusCodeJs =
+ """
+ |{
+ | "userDefinedStatusCode": 404,
+ | "causedBy": "sequence",
+ | "conductor": false,
+ | "duration": 123,
+ | "initTime": 10,
+ | "kind": "testkind",
+ | "memory": 128,
+ | "name": "ns2/a",
+ | "statusCode": 0,
+ | "waitTime": 5
+ |}""".stripMargin.parseJson
+
it should "deserialize without error" in {
val activationResponse =
ActivationResponse.serdes.read(activationResponseJs)
val whiskActivation = WhiskActivation.serdes.read(whiskActivationJs)
val activation = Activation.activationFormat.read(activationJs)
+ val whiskActivationWithError =
WhiskActivation.serdes.read(whiskActivationErrorJs)
+ val activationWithActionStatus =
Activation.activationFormat.read(activationWithActionStatusCodeJs)
}
def generateJsons(): Unit = {