This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new e0d562e Emit CosmosDB request usage metric. (#4023) e0d562e is described below commit e0d562e19fe22aed49236b7ba32f94b157d459d4 Author: Chetan Mehrotra <chet...@apache.org> AuthorDate: Tue Sep 18 13:52:55 2018 +0530 Emit CosmosDB request usage metric. (#4023) --- .../src/main/scala/whisk/common/Logging.scala | 10 +++---- .../database/cosmosdb/CosmosDBArtifactStore.scala | 31 +++++++++++++++++++--- docs/metrics.md | 12 +++++++++ 3 files changed, 45 insertions(+), 8 deletions(-) diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala index 8149774..19642f9 100644 --- a/common/scala/src/main/scala/whisk/common/Logging.scala +++ b/common/scala/src/main/scala/whisk/common/Logging.scala @@ -182,8 +182,8 @@ case class LogMarkerToken(component: String, subAction: Option[String] = None, tags: Map[String, String] = Map.empty) { - override def toString = component + "_" + action + "_" + state - def toStringWithSubAction = + override val toString = component + "_" + action + "_" + state + val toStringWithSubAction = subAction.map(sa => component + "_" + action + "." + sa + "_" + state).getOrElse(toString) def asFinish = copy(state = LoggingMarkers.finish) @@ -212,14 +212,14 @@ object MetricEmitter { val metrics = Kamon.metrics - def emitCounterMetric(token: LogMarkerToken): Unit = { + def emitCounterMetric(token: LogMarkerToken, times: Long = 1): Unit = { if (TransactionId.metricsKamon) { if (TransactionId.metricsKamonTags) { metrics .counter(token.toString, token.tags) - .increment(1) + .increment(times) } else { - metrics.counter(token.toStringWithSubAction).increment(1) + metrics.counter(token.toStringWithSubAction).increment(times) } } } diff --git a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala index 116995e..9292d17 100644 --- a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala +++ b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala @@ -28,7 +28,7 @@ import akka.util.{ByteString, ByteStringBuilder} import com.microsoft.azure.cosmosdb._ import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue, RootJsonFormat, _} -import whisk.common.{Logging, LoggingMarkers, TransactionId} +import whisk.common.{LogMarkerToken, Logging, LoggingMarkers, MetricEmitter, TransactionId} import whisk.core.database.StoreUtils.{checkDocHasRevision, deserialize, reportFailure} import whisk.core.database._ import whisk.core.database.cosmosdb.CosmosDBArtifactStoreProvider.DocumentClientRef @@ -67,6 +67,13 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected private val _id = "_id" private val _rev = "_rev" + private val putToken = createToken("put", read = false) + private val delToken = createToken("del", read = false) + private val getToken = createToken("get") + private val queryToken = createToken("query") + private val countToken = createToken("count") + private val putAttachmentToken = createToken("putAttachment", read = false) + override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher override protected[database] def put(d: DocumentAbstraction)(implicit transid: TransactionId): Future[DocInfo] = { @@ -88,6 +95,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected .transform( { r => transid.finished(this, start, s"[PUT] '$collName' completed document: '$docinfoStr'") + collectMetrics(putToken, r.getRequestCharge) toDocInfo(r.getResource) }, { case e: DocumentClientException if isConflict(e) => @@ -106,8 +114,9 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected .deleteDocument(selfLinkOf(doc.id), matchRevOption(doc)) .head() .transform( - { _ => + { r => transid.finished(this, start, s"[DEL] '$collName' completed document: '$doc'") + collectMetrics(delToken, r.getRequestCharge) true }, { case e: DocumentClientException if isNotFound(e) => @@ -139,6 +148,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected { rr => val js = getResultToWhiskJsonDoc(rr.getResource) transid.finished(this, start, s"[GET] '$collName' completed: found document '$doc'") + collectMetrics(getToken, rr.getRequestCharge) deserialize[A, DocumentAbstraction](doc, js) }, { case e: DocumentClientException if isNotFound(e) => @@ -167,6 +177,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected .map { rr => val js = getResultToWhiskJsonDoc(rr.getResource) transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: found document '$id'") + collectMetrics(getToken, rr.getRequestCharge) Some(js) } .recoverWith { @@ -206,6 +217,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected RxReactiveStreams.toPublisher(client.queryDocuments(collection.getSelfLink, querySpec, newFeedOptions())) val f = Source .fromPublisher(publisher) + .wireTap(Sink.foreach(r => collectMetrics(queryToken, r.getRequestCharge))) .mapConcat(asSeq) .drop(skip) .map(queryResultToWhiskJsonDoc) @@ -241,6 +253,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected .map { r => val count = r.getResults.asScala.head.getLong(aggregate).longValue() transid.finished(this, start, s"[COUNT] '$collName' completed: count $count") + collectMetrics(countToken, r.getRequestCharge) if (count > skip) count - skip else 0L } @@ -321,9 +334,10 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected .upsertAttachment(selfLinkOf(doc.id), s, options, matchRevOption(doc)) .head() .transform( - { _ => + { r => transid .finished(this, start, s"[ATT_PUT] '$collName' completed uploading attachment '$name' of document '$doc'") + collectMetrics(putAttachmentToken, r.getRequestCharge) doc //Adding attachment does not change the revision of document. So retain the doc info }, { case e: DocumentClientException if isConflict(e) => @@ -495,4 +509,15 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected require(doc.getId != null, s"$doc does not have id field set") require(doc.getETag != null, s"$doc does not have etag field set") } + + private def collectMetrics(token: LogMarkerToken, charge: Double): Unit = { + MetricEmitter.emitCounterMetric(token, Math.round(charge)) + } + + private def createToken(action: String, read: Boolean = true): LogMarkerToken = { + val mode = if (read) "read" else "write" + val tags = Map("action" -> action, "mode" -> mode, "collection" -> collName) + if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "ru", "used", tags = tags) + else LogMarkerToken("cosmosdb", "ru", collName, Some(action)) + } } diff --git a/docs/metrics.md b/docs/metrics.md index 33e2b7e..0e96321 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -256,6 +256,18 @@ Operation Types * `saveDocument` * `saveDocumentBulk` +#### CosmosDB RU Metrics + +When database used is CosmosDB then metrics related to CosmosDB Resource Units is also emitted. + +If Kamon tags are enabled then metric name is `openwhisk.counter.cosmosdb_ru_used` with following tags + +- `mode` - `read` or `write` +- `collection` - Name of collection. Example `activations`, `whisks` and `subjects` +- `action` - Type of operation performed. Example `get`, `put`, `del`, `query` and `count` + +If Kamon tags are not enabled then metric name is of the form `openwhisk.counter.cosmosdb.ru.<collecton>.<action>` + ## User specific metrics ### Configuration User metrics are enabled by default and could be explicitly disabled by setting the following property in one of the Ansible configuration files: