This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 2ddd3c9 Collect and log CosmosDB query metrics when extra logging is
enabled (#4275)
2ddd3c9 is described below
commit 2ddd3c9cfd1b31176a54634f254707774a90f4f0
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Thu Feb 28 22:21:05 2019 +0530
Collect and log CosmosDB query metrics when extra logging is enabled (#4275)
* Collect and log CosmosDB query metrics when extra logging is enabled
Fixes #4268
---
.../database/cosmosdb/CosmosDBArtifactStore.scala | 29 ++++++++++++++++----
.../cosmosdb/CosmosDBArtifactStoreTests.scala | 32 ++++++++++++++++++++++
2 files changed, 56 insertions(+), 5 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index a20795b..3d08991 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -40,6 +40,7 @@ import org.apache.openwhisk.http.Messages
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected val collName: String,
protected val config: CosmosDBConfig,
@@ -223,11 +224,22 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
val querySpec = viewMapper.prepareQuery(ddoc, viewName, startKey, endKey,
realLimit, realIncludeDocs, descending)
+ val options = newFeedOptions()
+ val queryMetrics = scala.collection.mutable.Buffer[QueryMetrics]()
+ if (transid.meta.extraLogging) {
+ options.setPopulateQueryMetrics(true)
+ }
+
+ def collectQueryMetrics(r: FeedResponse[Document]): Unit = {
+ collectMetrics(queryToken, r.getRequestCharge)
+ queryMetrics.appendAll(r.getQueryMetrics.values().asScala)
+ }
+
val publisher =
-
RxReactiveStreams.toPublisher(client.queryDocuments(collection.getSelfLink,
querySpec, newFeedOptions()))
+
RxReactiveStreams.toPublisher(client.queryDocuments(collection.getSelfLink,
querySpec, options))
val f = Source
.fromPublisher(publisher)
- .wireTap(Sink.foreach(r => collectMetrics(queryToken,
r.getRequestCharge)))
+ .wireTap(Sink.foreach(collectQueryMetrics))
.mapConcat(asSeq)
.drop(skip)
.map(queryResultToWhiskJsonDoc)
@@ -240,10 +252,17 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
.map(_.toList)
.map(l => if (limit > 0) l.take(limit) else l)
- f.foreach { out =>
- transid.finished(this, start, s"[QUERY] '$collName' completed: matched
${out.size}")
+ val g = f.andThen {
+ case Success(out) =>
+ if (queryMetrics.nonEmpty) {
+ val combinedMetrics = QueryMetrics.ZERO.add(queryMetrics: _*)
+ logging.debug(
+ this,
+ s"[QueryMetricsEnabled] Collection [$collName] - Query
[${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]")
+ }
+ transid.finished(this, start, s"[QUERY] '$collName' completed: matched
${out.size}")
}
- reportFailure(f, start, failure => s"[QUERY] '$collName' internal error,
failure: '${failure.getMessage}'")
+ reportFailure(g, start, failure => s"[QUERY] '$collName' internal error,
failure: '${failure.getMessage}'")
}
override protected[core] def count(table: String,
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
index 77962a6..d9f4d66 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
@@ -19,11 +19,14 @@ package org.apache.openwhisk.core.database.cosmosdb
import io.netty.util.ResourceLeakDetector
import io.netty.util.ResourceLeakDetector.Level
+import org.apache.openwhisk.common.TransactionId
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Pending}
import org.scalatest.junit.JUnitRunner
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.database.test.behavior.ArtifactStoreBehavior
+import org.apache.openwhisk.core.entity.WhiskActivation
+import org.apache.openwhisk.core.entity.WhiskEntityQueries.TOP
@RunWith(classOf[JUnitRunner])
class CosmosDBArtifactStoreTests extends FlatSpec with
CosmosDBStoreBehaviorBase with ArtifactStoreBehavior {
@@ -80,4 +83,33 @@ class CosmosDBArtifactStoreTests extends FlatSpec with
CosmosDBStoreBehaviorBase
}
}
}
+
+ behavior of "CosmosDB query debug"
+
+ it should "log query metrics in debug flow" in {
+ val debugTid = TransactionId("42", extraLogging = true)
+ val tid = TransactionId("42")
+ val ns = newNS()
+ val activations = (1000 until 1100 by 10).map(newActivation(ns.asString,
"testact", _))
+ activations foreach (put(activationStore, _)(tid))
+
+ val entityPath = s"${ns.asString}/testact"
+ stream.reset()
+ query[WhiskActivation](
+ activationStore,
+ WhiskActivation.filtersView.name,
+ List(entityPath, 1050),
+ List(entityPath, TOP, TOP))(tid)
+
+ stream.toString should not include ("[QueryMetricsEnabled]")
+ stream.reset()
+
+ query[WhiskActivation](
+ activationStore,
+ WhiskActivation.filtersView.name,
+ List(entityPath, 1050),
+ List(entityPath, TOP, TOP))(debugTid)
+ stream.toString should include("[QueryMetricsEnabled]")
+
+ }
}