This is an automated email from the ASF dual-hosted git repository.
chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new c674757 Allow namespace ignore in user-events (#4668)
c674757 is described below
commit c674757c3e774368873103d40dc21eff1ae051f1
Author: Cosmin Stanciu <[email protected]>
AuthorDate: Tue Nov 5 22:44:38 2019 -0800
Allow namespace ignore in user-events (#4668)
Enable support for ignoring action level metrics for certain namespaces
which are used for test purposes
Fixes #4667
---
core/monitoring/user-events/README.md | 9 +++
.../user-events/src/main/resources/reference.conf | 3 +
.../core/monitoring/metrics/EventConsumer.scala | 11 ++--
.../core/monitoring/metrics/KamonRecorder.scala | 28 ++++++--
.../core/monitoring/metrics/OpenWhiskEvents.scala | 4 +-
.../monitoring/metrics/PrometheusRecorder.scala | 30 ++++++---
.../resources/application.conf} | 18 ++---
.../core/monitoring/metrics/EventsTestHelper.scala | 5 +-
.../monitoring/metrics/KamonRecorderTests.scala | 73 ++++++++++++--------
.../metrics/PrometheusRecorderTests.scala | 77 ++++++++++++++--------
10 files changed, 172 insertions(+), 86 deletions(-)
diff --git a/core/monitoring/user-events/README.md
b/core/monitoring/user-events/README.md
index 5ed2127..ff6730a 100644
--- a/core/monitoring/user-events/README.md
+++ b/core/monitoring/user-events/README.md
@@ -36,6 +36,15 @@ This service connects to `events` topic and publishes the
events to various serv
The service needs the following env variables to be set
- `KAFKA_HOSTS` - For local env it can be set to `172.17.0.1:9093`. When using
[OpenWhisk Devtools][2] based setup use `kafka`
+- Namespaces can be removed from reports by listing them inside the
`reference.conf` using the `whisk.user-events.ignored-namespaces` configuration.
+e.g:
+```
+whisk {
+ user-events {
+ ignored-namespaces = ["canary","testing"]
+ }
+}
+```
Integrations
------------
diff --git a/core/monitoring/user-events/src/main/resources/reference.conf
b/core/monitoring/user-events/src/main/resources/reference.conf
index 6f7d1c2..6282614 100644
--- a/core/monitoring/user-events/src/main/resources/reference.conf
+++ b/core/monitoring/user-events/src/main/resources/reference.conf
@@ -23,5 +23,8 @@ whisk {
# Enables KamonRecorder so as to enable sending metrics to Kamon supported
backends
# like DataDog
enable-kamon = false
+
+ # Namespaces that should not be monitored
+ ignored-namespaces = []
}
}
diff --git
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
index 65f245e..a6eca9b 100644
---
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
+++
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
@@ -39,15 +39,16 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import org.apache.openwhisk.core.connector.{Activation, EventMessage, Metric}
import org.apache.openwhisk.core.entity.ActivationResponse
+import
org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
trait MetricRecorder {
- def processActivation(activation: Activation, initiatorNamespace: String):
Unit
+ def processActivation(activation: Activation, initiatorNamespace: String,
metricConfig: MetricConfig): Unit
def processMetric(metric: Metric, initiatorNamespace: String): Unit
}
-case class EventConsumer(settings: ConsumerSettings[String, String],
recorders: Seq[MetricRecorder])(
- implicit system: ActorSystem,
- materializer: ActorMaterializer)
+case class EventConsumer(settings: ConsumerSettings[String, String],
+ recorders: Seq[MetricRecorder],
+ metricConfig: MetricConfig)(implicit system:
ActorSystem, materializer: ActorMaterializer)
extends KafkaMetricsProvider {
import EventConsumer._
@@ -110,7 +111,7 @@ case class EventConsumer(settings: ConsumerSettings[String,
String], recorders:
.foreach { e =>
e.body match {
case a: Activation =>
- recorders.foreach(_.processActivation(a, e.namespace))
+ recorders.foreach(_.processActivation(a, e.namespace,
metricConfig))
updateGlobalMetrics(a)
case m: Metric =>
recorders.foreach(_.processMetric(m, e.namespace))
diff --git
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
index 34af567..c2e785b 100644
---
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
+++
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
@@ -21,10 +21,12 @@ import akka.event.slf4j.SLF4JLogging
import org.apache.openwhisk.core.connector.{Activation, Metric}
import kamon.Kamon
import kamon.metric.MeasurementUnit
+import
org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
import scala.collection.concurrent.TrieMap
trait KamonMetricNames extends MetricNames {
+ val namespaceActivationMetric = "openwhisk.namespace.activations"
val activationMetric = "openwhisk.action.activations"
val coldStartMetric = "openwhisk.action.coldStarts"
val waitTimeMetric = "openwhisk.action.waitTime"
@@ -41,23 +43,23 @@ object KamonRecorder extends MetricRecorder with
KamonMetricNames with SLF4JLogg
private val activationMetrics = new TrieMap[String, ActivationKamonMetrics]
private val limitMetrics = new TrieMap[String, LimitKamonMetrics]
- override def processActivation(activation: Activation, initiatorNamespace:
String): Unit = {
- lookup(activation, initiatorNamespace).record(activation)
+ override def processActivation(activation: Activation, initiator: String,
metricConfig: MetricConfig): Unit = {
+ lookup(activation, initiator).record(activation, metricConfig)
}
- override def processMetric(metric: Metric, initiatorNamespace: String): Unit
= {
- val limitMetric = limitMetrics.getOrElseUpdate(initiatorNamespace,
LimitKamonMetrics(initiatorNamespace))
+ override def processMetric(metric: Metric, initiator: String): Unit = {
+ val limitMetric = limitMetrics.getOrElseUpdate(initiator,
LimitKamonMetrics(initiator))
limitMetric.record(metric)
}
- def lookup(activation: Activation, initiatorNamespace: String):
ActivationKamonMetrics = {
+ def lookup(activation: Activation, initiator: String):
ActivationKamonMetrics = {
val name = activation.name
val kind = activation.kind
val memory = activation.memory.toString
val namespace = activation.namespace
val action = activation.action
activationMetrics.getOrElseUpdate(name, {
- ActivationKamonMetrics(namespace, action, kind, memory,
initiatorNamespace)
+ ActivationKamonMetrics(namespace, action, kind, memory, initiator)
})
}
@@ -87,8 +89,11 @@ object KamonRecorder extends MetricRecorder with
KamonMetricNames with SLF4JLogg
`actionName` -> action,
`actionKind` -> kind,
`actionMemory` -> memory)
+ private val namespaceActivationsTags =
+ Map(`actionNamespace` -> namespace, `initiatorNamespace` -> initiator)
private val tags = Map(`actionNamespace` -> namespace,
`initiatorNamespace` -> initiator, `actionName` -> action)
+ private val namespaceActivations =
Kamon.counter(namespaceActivationMetric).refine(namespaceActivationsTags)
private val activations =
Kamon.counter(activationMetric).refine(activationTags)
private val coldStarts = Kamon.counter(coldStartMetric).refine(tags)
private val waitTime = Kamon.histogram(waitTimeMetric,
MeasurementUnit.time.milliseconds).refine(tags)
@@ -96,7 +101,16 @@ object KamonRecorder extends MetricRecorder with
KamonMetricNames with SLF4JLogg
private val duration = Kamon.histogram(durationMetric,
MeasurementUnit.time.milliseconds).refine(tags)
private val responseSize = Kamon.histogram(responseSizeMetric,
MeasurementUnit.information.bytes).refine(tags)
- def record(a: Activation): Unit = {
+ def record(a: Activation, metricConfig: MetricConfig): Unit = {
+ namespaceActivations.increment()
+
+ // only record activation if not executed in an ignored namespace
+ if (!metricConfig.ignoredNamespaces.contains(a.namespace)) {
+ recordActivation(a)
+ }
+ }
+
+ def recordActivation(a: Activation): Unit = {
activations.increment()
if (a.isColdStart) {
diff --git
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
index f5c7ce6..49ddb2c 100644
---
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
+++
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
@@ -33,7 +33,7 @@ import scala.concurrent.{ExecutionContext, Future}
object OpenWhiskEvents extends SLF4JLogging {
- case class MetricConfig(port: Int, enableKamon: Boolean)
+ case class MetricConfig(port: Int, enableKamon: Boolean, ignoredNamespaces:
Set[String])
def start(config: Config)(implicit system: ActorSystem,
materializer: ActorMaterializer):
Future[Http.ServerBinding] = {
@@ -47,7 +47,7 @@ object OpenWhiskEvents extends SLF4JLogging {
val prometheusRecorder = PrometheusRecorder(prometheusReporter)
val recorders = if (metricConfig.enableKamon) Seq(prometheusRecorder,
KamonRecorder) else Seq(prometheusRecorder)
- val eventConsumer =
EventConsumer(eventConsumerSettings(defaultConsumerConfig(config)), recorders)
+ val eventConsumer =
EventConsumer(eventConsumerSettings(defaultConsumerConfig(config)), recorders,
metricConfig)
CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind,
"shutdownConsumer") { () =>
eventConsumer.shutdown()
diff --git
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
index 516a91d..9cf7a22 100644
---
a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
+++
b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
@@ -29,6 +29,7 @@ import org.apache.openwhisk.core.connector.{Activation,
Metric}
import io.prometheus.client.exporter.common.TextFormat
import io.prometheus.client.{CollectorRegistry, Counter, Gauge, Histogram}
import kamon.prometheus.PrometheusReporter
+import
org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
import org.apache.openwhisk.core.entity.{ActivationEntityLimit,
ActivationResponse}
import scala.collection.JavaConverters._
@@ -36,6 +37,7 @@ import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.Duration
trait PrometheusMetricNames extends MetricNames {
+ val namespaceMetric = "openwhisk_namespace_activations_total"
val activationMetric = "openwhisk_action_activations_total"
val coldStartMetric = "openwhisk_action_coldStarts_total"
val waitTimeMetric = "openwhisk_action_waitTime_seconds"
@@ -57,19 +59,19 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
private val activationMetrics = new TrieMap[String, ActivationPromMetrics]
private val limitMetrics = new TrieMap[String, LimitPromMetrics]
- override def processActivation(activation: Activation, initiatorNamespace:
String): Unit = {
- lookup(activation, initiatorNamespace).record(activation)
+ override def processActivation(activation: Activation, initiator: String,
metricConfig: MetricConfig): Unit = {
+ lookup(activation, initiator).record(activation, initiator, metricConfig)
}
- override def processMetric(metric: Metric, initiatorNamespace: String): Unit
= {
- val limitMetric = limitMetrics.getOrElseUpdate(initiatorNamespace,
LimitPromMetrics(initiatorNamespace))
+ override def processMetric(metric: Metric, initiator: String): Unit = {
+ val limitMetric = limitMetrics.getOrElseUpdate(initiator,
LimitPromMetrics(initiator))
limitMetric.record(metric)
}
override def getReport(): MessageEntity =
HttpEntity(PrometheusExporter.textV4, createSource())
- private def lookup(activation: Activation, initiatorNamespace: String):
ActivationPromMetrics = {
+ private def lookup(activation: Activation, initiator: String):
ActivationPromMetrics = {
//TODO Unregister unused actions
val name = activation.name
val kind = activation.kind
@@ -77,7 +79,7 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
val namespace = activation.namespace
val action = activation.action
activationMetrics.getOrElseUpdate(name, {
- ActivationPromMetrics(namespace, action, kind, memory,
initiatorNamespace)
+ ActivationPromMetrics(namespace, action, kind, memory, initiator)
})
}
@@ -100,6 +102,7 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
kind: String,
memory: String,
initiatorNamespace: String) {
+ private val namespaceActivations =
namespaceActivationCounter.labels(namespace, initiatorNamespace)
private val activations = activationCounter.labels(namespace,
initiatorNamespace, action, kind, memory)
private val coldStarts = coldStartCounter.labels(namespace,
initiatorNamespace, action)
private val waitTime = waitTimeHisto.labels(namespace, initiatorNamespace,
action)
@@ -118,7 +121,16 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
private val statusInternalError =
statusCounter.labels(namespace, initiatorNamespace, action,
ActivationResponse.statusWhiskError)
- def record(a: Activation): Unit = {
+ def record(a: Activation, initiator: String, metricConfig: MetricConfig):
Unit = {
+ namespaceActivations.inc()
+
+ // only record activation if not executed in an ignored namespace
+ if (!metricConfig.ignoredNamespaces.contains(a.namespace)) {
+ recordActivation(a, initiator)
+ }
+ }
+
+ def recordActivation(a: Activation, initiator: String): Unit = {
gauge.set(a.memory)
activations.inc()
@@ -137,7 +149,7 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
case ActivationResponse.statusApplicationError =>
statusApplicationError.inc()
case ActivationResponse.statusDeveloperError =>
statusDeveloperError.inc()
case ActivationResponse.statusWhiskError =>
statusInternalError.inc()
- case x =>
statusCounter.labels(namespace, initiatorNamespace, action, x).inc()
+ case x =>
statusCounter.labels(namespace, initiator, action, x).inc()
}
a.size.foreach(responseSize.observe(_))
@@ -177,6 +189,8 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
}
object PrometheusRecorder extends PrometheusMetricNames {
+ private val namespaceActivationCounter =
+ counter(namespaceMetric, "Namespace activations Count", actionNamespace,
initiatorNamespace)
private val activationCounter =
counter(
activationMetric,
diff --git a/core/monitoring/user-events/src/main/resources/reference.conf
b/core/monitoring/user-events/src/test/resources/application.conf
similarity index 76%
copy from core/monitoring/user-events/src/main/resources/reference.conf
copy to core/monitoring/user-events/src/test/resources/application.conf
index 6f7d1c2..f7413dc 100644
--- a/core/monitoring/user-events/src/main/resources/reference.conf
+++ b/core/monitoring/user-events/src/test/resources/application.conf
@@ -15,13 +15,15 @@
# limitations under the License.
#
-whisk {
- user-events {
- # Server port
- port = 9095
+user-events {
+ # Server port
+ port = 9095
+
+ # Enables KamonRecorder so as to enable sending metrics to Kamon supported
backends
+ # like DataDog
+ enable-kamon = false
+
+ # Namespaces that should not be monitored
+ ignored-namespaces = ["guest"]
- # Enables KamonRecorder so as to enable sending metrics to Kamon supported
backends
- # like DataDog
- enable-kamon = false
- }
}
diff --git
a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala
b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala
index 71b8d2e..6f2edae 100644
---
a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala
+++
b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/EventsTestHelper.scala
@@ -23,6 +23,8 @@ import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.typesafe.config.Config
import kamon.prometheus.PrometheusReporter
+import
org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents.MetricConfig
+import pureconfig.loadConfigOrThrow
trait EventsTestHelper {
@@ -34,7 +36,8 @@ trait EventsTestHelper {
val settings = OpenWhiskEvents
.eventConsumerSettings(OpenWhiskEvents.defaultConsumerConfig(globalConfig))
.withBootstrapServers(s"localhost:$kport")
- EventConsumer(settings, Seq(recorder))
+ val metricConfig = loadConfigOrThrow[MetricConfig](globalConfig,
"user-events")
+ EventConsumer(settings, Seq(recorder), metricConfig)
}
protected def freePort(): Int = {
diff --git
a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala
b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala
index 8e09a70..446ed98 100644
---
a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala
+++
b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorderTests.scala
@@ -56,8 +56,9 @@ class KamonRecorderTests extends KafkaSpecBase with
BeforeAndAfterEach with Kamo
behavior of "KamonConsumer"
- val namespace = "whisk.system"
- val initiator = "testNS"
+ val initiator = "initiatorTest"
+ val namespaceDemo = "demo"
+ val namespaceGuest = "guest"
val actionWithCustomPackage = "apimgmt/createApi"
val actionWithDefaultPackage = "createApi"
val kind = "nodejs:10"
@@ -70,43 +71,52 @@ class KamonRecorderTests extends KafkaSpecBase with
BeforeAndAfterEach with Kamo
publishStringMessageToKafka(
EventConsumer.userEventTopic,
- newActivationEvent(s"$namespace/$actionWithCustomPackage").serialize)
+ newActivationEvent(s"$namespaceDemo/$actionWithCustomPackage").serialize)
+ publishStringMessageToKafka(
+ EventConsumer.userEventTopic,
+
newActivationEvent(s"$namespaceDemo/$actionWithDefaultPackage").serialize)
publishStringMessageToKafka(
EventConsumer.userEventTopic,
- newActivationEvent(s"$namespace/$actionWithDefaultPackage").serialize)
+
newActivationEvent(s"$namespaceGuest/$actionWithDefaultPackage").serialize)
sleep(sleepAfterProduce, "sleeping post produce")
consumer.shutdown().futureValue
sleep(4.second, "sleeping for Kamon reporters to get invoked")
// Custom package
- TestReporter.counter(activationMetric, actionWithCustomPackage).size
shouldBe 1
+ TestReporter.counter(activationMetric, namespaceDemo,
actionWithCustomPackage)(0).value shouldBe 1
TestReporter
- .counter(activationMetric, actionWithCustomPackage)
- .filter((t) => t.tags.get(actionMemory).get == memory.toString)
- .size shouldBe 1
+ .counter(activationMetric, namespaceDemo, actionWithCustomPackage)
+ .filter((t) => t.tags.get(actionMemory).get == memory.toString)(0)
+ .value shouldBe 1
TestReporter
- .counter(activationMetric, actionWithCustomPackage)
- .filter((t) => t.tags.get(actionKind).get == kind)
- .size shouldBe 1
+ .counter(activationMetric, namespaceDemo, actionWithCustomPackage)
+ .filter((t) => t.tags.get(actionKind).get == kind)(0)
+ .value shouldBe 1
TestReporter
- .counter(statusMetric, actionWithCustomPackage)
- .filter((t) => t.tags.get(actionStatus).get ==
ActivationResponse.statusDeveloperError)
- .size shouldBe 1
- TestReporter.counter(coldStartMetric, actionWithCustomPackage).size
shouldBe 1
- TestReporter.histogram(waitTimeMetric, actionWithCustomPackage).size
shouldBe 1
- TestReporter.histogram(initTimeMetric, actionWithCustomPackage).size
shouldBe 1
- TestReporter.histogram(durationMetric, actionWithCustomPackage).size
shouldBe 1
+ .counter(statusMetric, namespaceDemo, actionWithCustomPackage)
+ .filter((t) => t.tags.get(actionStatus).get ==
ActivationResponse.statusDeveloperError)(0)
+ .value shouldBe 1
+ TestReporter.counter(coldStartMetric, namespaceDemo,
actionWithCustomPackage)(0).value shouldBe 1
+ TestReporter.histogram(waitTimeMetric, namespaceDemo,
actionWithCustomPackage).size shouldBe 1
+ TestReporter.histogram(initTimeMetric, namespaceDemo,
actionWithCustomPackage).size shouldBe 1
+ TestReporter.histogram(durationMetric, namespaceDemo,
actionWithCustomPackage).size shouldBe 1
// Default package
- TestReporter.histogram(durationMetric, actionWithDefaultPackage).size
shouldBe 1
+ TestReporter.histogram(durationMetric, namespaceDemo,
actionWithDefaultPackage).size shouldBe 1
+
+ // Blacklisted namespace should not be tracked
+ TestReporter.counter(activationMetric, namespaceGuest,
actionWithDefaultPackage)(0).value shouldBe 0
+
+ // Blacklisted should be counted in "openwhisk.namespace.activations"
metric
+ TestReporter.namespaceCounter(namespaceActivationMetric,
namespaceGuest)(0).value shouldBe 1
}
- private def newActivationEvent(name: String) =
+ private def newActivationEvent(actionPath: String) =
EventMessage(
- namespace,
- Activation(name, 2, 3.millis, 5.millis, 11.millis, kind, false, memory,
None),
+ "test",
+ Activation(actionPath, 2, 3.millis, 5.millis, 11.millis, kind, false,
memory, None),
Subject("testuser"),
initiator,
UUID("test"),
@@ -126,24 +136,35 @@ class KamonRecorderTests extends KafkaSpecBase with
BeforeAndAfterEach with Kamo
snapshotAccumulator = new PeriodSnapshotAccumulator(Duration.ofDays(1),
Duration.ZERO)
}
- def counter(name: String, action: String) = {
+ def counter(metricName: String, namespace: String, action: String) = {
System.out.println()
snapshotAccumulator
.peek()
.metrics
.counters
- .filter(_.name == name)
+ .filter(_.name == metricName)
.filter((t) => t.tags.get(actionNamespace).get == namespace)
.filter((t) => t.tags.get(initiatorNamespace).get == initiator)
.filter((t) => t.tags.get(actionName).get == action)
}
- def histogram(name: String, action: String) = {
+ def namespaceCounter(metricName: String, namespace: String) = {
+ System.out.println()
+ snapshotAccumulator
+ .peek()
+ .metrics
+ .counters
+ .filter(_.name == metricName)
+ .filter((t) => t.tags.get(actionNamespace).get == namespace)
+ .filter((t) => t.tags.get(initiatorNamespace).get == initiator)
+ }
+
+ def histogram(metricName: String, namespace: String, action: String) = {
snapshotAccumulator
.peek()
.metrics
.histograms
- .filter(_.name == name)
+ .filter(_.name == metricName)
.filter((t) => t.tags.get(actionNamespace).get == namespace)
.filter((t) => t.tags.get(initiatorNamespace).get == initiator)
.filter((t) => t.tags.get(actionName).get == action)
diff --git
a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
index 2241a8b..c0ffedf 100644
---
a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
+++
b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
@@ -29,8 +29,9 @@ import scala.concurrent.duration._
@RunWith(classOf[JUnitRunner])
class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach
with PrometheusMetricNames {
behavior of "PrometheusConsumer"
- val namespace = "whisk.system"
- val initiator = "testNS"
+ val initiator = "initiatorTest"
+ val namespaceDemo = "demo"
+ val namespaceGuest = "guest"
val actionWithCustomPackage = "apimgmt/createApiOne"
val actionWithDefaultPackage = "createApi"
val kind = "nodejs:10"
@@ -42,75 +43,93 @@ class PrometheusRecorderTests extends KafkaSpecBase with
BeforeAndAfterEach with
val consumer = createConsumer(kafkaPort, system.settings.config)
publishStringMessageToKafka(
EventConsumer.userEventTopic,
- newActivationEvent(s"$namespace/$actionWithCustomPackage", kind, memory,
initiator).serialize)
+ newActivationEvent(s"$namespaceDemo/$actionWithCustomPackage", kind,
memory).serialize)
+ publishStringMessageToKafka(
+ EventConsumer.userEventTopic,
+ newActivationEvent(s"$namespaceDemo/$actionWithDefaultPackage", kind,
memory).serialize)
publishStringMessageToKafka(
EventConsumer.userEventTopic,
- newActivationEvent(s"$namespace/$actionWithDefaultPackage", kind,
memory, initiator).serialize)
+ newActivationEvent(s"$namespaceGuest/$actionWithDefaultPackage", kind,
memory).serialize)
// Custom package
sleep(sleepAfterProduce, "sleeping post produce")
consumer.shutdown().futureValue
- counterTotal(activationMetric, actionWithCustomPackage) shouldBe 1
- counter(coldStartMetric, actionWithCustomPackage) shouldBe 1
- counterStatus(statusMetric, actionWithCustomPackage,
ActivationResponse.statusDeveloperError) shouldBe 1
+ counterTotal(activationMetric, namespaceDemo, actionWithCustomPackage)
shouldBe 1
+ counter(coldStartMetric, namespaceDemo, actionWithCustomPackage) shouldBe 1
+ counterStatus(statusMetric, namespaceDemo, actionWithCustomPackage,
ActivationResponse.statusDeveloperError) shouldBe 1
- histogramCount(waitTimeMetric, actionWithCustomPackage) shouldBe 1
- histogramSum(waitTimeMetric, actionWithCustomPackage) shouldBe (0.03 +-
0.001)
+ histogramCount(waitTimeMetric, namespaceDemo, actionWithCustomPackage)
shouldBe 1
+ histogramSum(waitTimeMetric, namespaceDemo, actionWithCustomPackage)
shouldBe (0.03 +- 0.001)
- histogramCount(initTimeMetric, actionWithCustomPackage) shouldBe 1
- histogramSum(initTimeMetric, actionWithCustomPackage) shouldBe (433.433 +-
0.01)
+ histogramCount(initTimeMetric, namespaceDemo, actionWithCustomPackage)
shouldBe 1
+ histogramSum(initTimeMetric, namespaceDemo, actionWithCustomPackage)
shouldBe (433.433 +- 0.01)
- histogramCount(durationMetric, actionWithCustomPackage) shouldBe 1
- histogramSum(durationMetric, actionWithCustomPackage) shouldBe (1.254 +-
0.01)
+ histogramCount(durationMetric, namespaceDemo, actionWithCustomPackage)
shouldBe 1
+ histogramSum(durationMetric, namespaceDemo, actionWithCustomPackage)
shouldBe (1.254 +- 0.01)
- gauge(memoryMetric, actionWithCustomPackage).intValue() shouldBe 256
+ gauge(memoryMetric, namespaceDemo, actionWithCustomPackage).intValue()
shouldBe 256
// Default package
- counterTotal(activationMetric, actionWithDefaultPackage) shouldBe 1
+ counterTotal(activationMetric, namespaceDemo, actionWithDefaultPackage)
shouldBe 1
+
+ // Blacklisted namespace should not be tracked
+ counterTotal(activationMetric, namespaceGuest, actionWithDefaultPackage)
shouldBe 0
+
+ // Blacklisted should be counted in
"openwhisk_namespace_activations_total" metric
+ namespaceCounterTotal(namespaceMetric, namespaceGuest) shouldBe 1
}
- private def newActivationEvent(name: String, kind: String, memory: String,
initiator: String) =
+ private def newActivationEvent(actionPath: String, kind: String, memory:
String) =
EventMessage(
"test",
- Activation(name, 2, 1254.millis, 30.millis, 433433.millis, kind, false,
memory.toInt, None),
+ Activation(actionPath, 2, 1254.millis, 30.millis, 433433.millis, kind,
false, memory.toInt, None),
Subject("testuser"),
initiator,
UUID("test"),
Activation.typeName)
- private def gauge(name: String, action: String) =
+ private def gauge(metricName: String, namespace: String, action: String) =
CollectorRegistry.defaultRegistry.getSampleValue(
- name,
+ metricName,
Array("namespace", "initiator", "action"),
Array(namespace, initiator, action))
- private def counter(name: String, action: String) =
+ private def counter(metricName: String, namespace: String, action: String) =
CollectorRegistry.defaultRegistry.getSampleValue(
- name,
+ metricName,
Array("namespace", "initiator", "action"),
Array(namespace, initiator, action))
- private def counterTotal(name: String, action: String) =
+ private def counterTotal(metricName: String, namespace: String, action:
String) =
CollectorRegistry.defaultRegistry.getSampleValue(
- name,
+ metricName,
Array("namespace", "initiator", "action", "kind", "memory"),
Array(namespace, initiator, action, kind, memory))
- private def counterStatus(name: String, action: String, status: String) =
+ private def namespaceCounterTotal(metricName: String, namespace: String) =
+ CollectorRegistry.defaultRegistry.getSampleValue(
+ metricName,
+ Array("namespace", "initiator"),
+ Array(namespace, initiator))
+
+ private def counterStatus(metricName: String, namespace: String, action:
String, status: String) =
CollectorRegistry.defaultRegistry.getSampleValue(
- name,
+ metricName,
Array("namespace", "initiator", "action", "status"),
Array(namespace, initiator, action, status))
- private def histogramCount(name: String, action: String) =
+ private def histogramCount(metricName: String, namespace: String, action:
String) =
CollectorRegistry.defaultRegistry.getSampleValue(
- s"${name}_count",
+ s"${metricName}_count",
Array("namespace", "initiator", "action"),
Array(namespace, initiator, action))
- private def histogramSum(name: String, action: String) =
+ private def histogramSum(metricName: String, namespace: String, action:
String) =
CollectorRegistry.defaultRegistry
- .getSampleValue(s"${name}_sum", Array("namespace", "initiator",
"action"), Array(namespace, initiator, action))
+ .getSampleValue(
+ s"${metricName}_sum",
+ Array("namespace", "initiator", "action"),
+ Array(namespace, initiator, action))
.doubleValue()
}