markusthoemmes closed pull request #4023: Emit CosmosDB request usage metric
URL: https://github.com/apache/incubator-openwhisk/pull/4023
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala 
b/common/scala/src/main/scala/whisk/common/Logging.scala
index 8149774cf0..19642f9b97 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -182,8 +182,8 @@ case class LogMarkerToken(component: String,
                           subAction: Option[String] = None,
                           tags: Map[String, String] = Map.empty) {
 
-  override def toString = component + "_" + action + "_" + state
-  def toStringWithSubAction =
+  override val toString = component + "_" + action + "_" + state
+  val toStringWithSubAction =
     subAction.map(sa => component + "_" + action + "." + sa + "_" + 
state).getOrElse(toString)
 
   def asFinish = copy(state = LoggingMarkers.finish)
@@ -212,14 +212,14 @@ object MetricEmitter {
 
   val metrics = Kamon.metrics
 
-  def emitCounterMetric(token: LogMarkerToken): Unit = {
+  def emitCounterMetric(token: LogMarkerToken, times: Long = 1): Unit = {
     if (TransactionId.metricsKamon) {
       if (TransactionId.metricsKamonTags) {
         metrics
           .counter(token.toString, token.tags)
-          .increment(1)
+          .increment(times)
       } else {
-        metrics.counter(token.toStringWithSubAction).increment(1)
+        metrics.counter(token.toStringWithSubAction).increment(times)
       }
     }
   }
diff --git 
a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
 
b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index 116995e049..9292d17a4c 100644
--- 
a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ 
b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -28,7 +28,7 @@ import akka.util.{ByteString, ByteStringBuilder}
 import com.microsoft.azure.cosmosdb._
 import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
 import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue, 
RootJsonFormat, _}
-import whisk.common.{Logging, LoggingMarkers, TransactionId}
+import whisk.common.{LogMarkerToken, Logging, LoggingMarkers, MetricEmitter, 
TransactionId}
 import whisk.core.database.StoreUtils.{checkDocHasRevision, deserialize, 
reportFailure}
 import whisk.core.database._
 import 
whisk.core.database.cosmosdb.CosmosDBArtifactStoreProvider.DocumentClientRef
@@ -67,6 +67,13 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
   private val _id = "_id"
   private val _rev = "_rev"
 
+  private val putToken = createToken("put", read = false)
+  private val delToken = createToken("del", read = false)
+  private val getToken = createToken("get")
+  private val queryToken = createToken("query")
+  private val countToken = createToken("count")
+  private val putAttachmentToken = createToken("putAttachment", read = false)
+
   override protected[core] implicit val executionContext: ExecutionContext = 
system.dispatcher
 
   override protected[database] def put(d: DocumentAbstraction)(implicit 
transid: TransactionId): Future[DocInfo] = {
@@ -88,6 +95,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
       .transform(
         { r =>
           transid.finished(this, start, s"[PUT] '$collName' completed 
document: '$docinfoStr'")
+          collectMetrics(putToken, r.getRequestCharge)
           toDocInfo(r.getResource)
         }, {
           case e: DocumentClientException if isConflict(e) =>
@@ -106,8 +114,9 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
       .deleteDocument(selfLinkOf(doc.id), matchRevOption(doc))
       .head()
       .transform(
-        { _ =>
+        { r =>
           transid.finished(this, start, s"[DEL] '$collName' completed 
document: '$doc'")
+          collectMetrics(delToken, r.getRequestCharge)
           true
         }, {
           case e: DocumentClientException if isNotFound(e) =>
@@ -139,6 +148,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
         { rr =>
           val js = getResultToWhiskJsonDoc(rr.getResource)
           transid.finished(this, start, s"[GET] '$collName' completed: found 
document '$doc'")
+          collectMetrics(getToken, rr.getRequestCharge)
           deserialize[A, DocumentAbstraction](doc, js)
         }, {
           case e: DocumentClientException if isNotFound(e) =>
@@ -167,6 +177,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
       .map { rr =>
         val js = getResultToWhiskJsonDoc(rr.getResource)
         transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: 
found document '$id'")
+        collectMetrics(getToken, rr.getRequestCharge)
         Some(js)
       }
       .recoverWith {
@@ -206,6 +217,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
       
RxReactiveStreams.toPublisher(client.queryDocuments(collection.getSelfLink, 
querySpec, newFeedOptions()))
     val f = Source
       .fromPublisher(publisher)
+      .wireTap(Sink.foreach(r => collectMetrics(queryToken, 
r.getRequestCharge)))
       .mapConcat(asSeq)
       .drop(skip)
       .map(queryResultToWhiskJsonDoc)
@@ -241,6 +253,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
       .map { r =>
         val count = r.getResults.asScala.head.getLong(aggregate).longValue()
         transid.finished(this, start, s"[COUNT] '$collName' completed: count 
$count")
+        collectMetrics(countToken, r.getRequestCharge)
         if (count > skip) count - skip else 0L
       }
 
@@ -321,9 +334,10 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
       .upsertAttachment(selfLinkOf(doc.id), s, options, matchRevOption(doc))
       .head()
       .transform(
-        { _ =>
+        { r =>
           transid
             .finished(this, start, s"[ATT_PUT] '$collName' completed uploading 
attachment '$name' of document '$doc'")
+          collectMetrics(putAttachmentToken, r.getRequestCharge)
           doc //Adding attachment does not change the revision of document. So 
retain the doc info
         }, {
           case e: DocumentClientException if isConflict(e) =>
@@ -495,4 +509,15 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
     require(doc.getId != null, s"$doc does not have id field set")
     require(doc.getETag != null, s"$doc does not have etag field set")
   }
+
+  private def collectMetrics(token: LogMarkerToken, charge: Double): Unit = {
+    MetricEmitter.emitCounterMetric(token, Math.round(charge))
+  }
+
+  private def createToken(action: String, read: Boolean = true): 
LogMarkerToken = {
+    val mode = if (read) "read" else "write"
+    val tags = Map("action" -> action, "mode" -> mode, "collection" -> 
collName)
+    if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "ru", 
"used", tags = tags)
+    else LogMarkerToken("cosmosdb", "ru", collName, Some(action))
+  }
 }
diff --git a/docs/metrics.md b/docs/metrics.md
index 33e2b7eda0..0e96321cd7 100644
--- a/docs/metrics.md
+++ b/docs/metrics.md
@@ -256,6 +256,18 @@ Operation Types
 * `saveDocument`
 * `saveDocumentBulk`
 
+#### CosmosDB RU Metrics
+
+When database used is CosmosDB then metrics related to CosmosDB Resource Units 
is also emitted.
+
+If Kamon tags are enabled then metric name is 
`openwhisk.counter.cosmosdb_ru_used` with following tags
+
+- `mode` - `read` or `write`
+- `collection` - Name of collection. Example `activations`, `whisks` and 
`subjects`
+- `action` - Type of operation performed. Example `get`, `put`, `del`, `query` 
and `count`
+
+If Kamon tags are not enabled then metric name is of the form 
`openwhisk.counter.cosmosdb.ru.<collecton>.<action>`
+
 ## User specific metrics
 ### Configuration
 User metrics are enabled by default and could be explicitly disabled by 
setting the following property in one of the Ansible configuration files:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to