This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ddd3c9  Collect and log CosmosDB query metrics when extra logging is 
enabled (#4275)
2ddd3c9 is described below

commit 2ddd3c9cfd1b31176a54634f254707774a90f4f0
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Thu Feb 28 22:21:05 2019 +0530

    Collect and log CosmosDB query metrics when extra logging is enabled (#4275)
    
    * Collect and log CosmosDB query metrics when extra logging is enabled 
Fixes #4268
---
 .../database/cosmosdb/CosmosDBArtifactStore.scala  | 29 ++++++++++++++++----
 .../cosmosdb/CosmosDBArtifactStoreTests.scala      | 32 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 5 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index a20795b..3d08991 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -40,6 +40,7 @@ import org.apache.openwhisk.http.Messages
 
 import scala.collection.JavaConverters._
 import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Success
 
 class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected val collName: String,
                                                                        
protected val config: CosmosDBConfig,
@@ -223,11 +224,22 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
 
     val querySpec = viewMapper.prepareQuery(ddoc, viewName, startKey, endKey, 
realLimit, realIncludeDocs, descending)
 
+    val options = newFeedOptions()
+    val queryMetrics = scala.collection.mutable.Buffer[QueryMetrics]()
+    if (transid.meta.extraLogging) {
+      options.setPopulateQueryMetrics(true)
+    }
+
+    def collectQueryMetrics(r: FeedResponse[Document]): Unit = {
+      collectMetrics(queryToken, r.getRequestCharge)
+      queryMetrics.appendAll(r.getQueryMetrics.values().asScala)
+    }
+
     val publisher =
-      
RxReactiveStreams.toPublisher(client.queryDocuments(collection.getSelfLink, 
querySpec, newFeedOptions()))
+      
RxReactiveStreams.toPublisher(client.queryDocuments(collection.getSelfLink, 
querySpec, options))
     val f = Source
       .fromPublisher(publisher)
-      .wireTap(Sink.foreach(r => collectMetrics(queryToken, 
r.getRequestCharge)))
+      .wireTap(Sink.foreach(collectQueryMetrics))
       .mapConcat(asSeq)
       .drop(skip)
       .map(queryResultToWhiskJsonDoc)
@@ -240,10 +252,17 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
       .map(_.toList)
       .map(l => if (limit > 0) l.take(limit) else l)
 
-    f.foreach { out =>
-      transid.finished(this, start, s"[QUERY] '$collName' completed: matched 
${out.size}")
+    val g = f.andThen {
+      case Success(out) =>
+        if (queryMetrics.nonEmpty) {
+          val combinedMetrics = QueryMetrics.ZERO.add(queryMetrics: _*)
+          logging.debug(
+            this,
+            s"[QueryMetricsEnabled] Collection [$collName] - Query 
[${querySpec.getQueryText}].\nQueryMetrics\n[$combinedMetrics]")
+        }
+        transid.finished(this, start, s"[QUERY] '$collName' completed: matched 
${out.size}")
     }
-    reportFailure(f, start, failure => s"[QUERY] '$collName' internal error, 
failure: '${failure.getMessage}'")
+    reportFailure(g, start, failure => s"[QUERY] '$collName' internal error, 
failure: '${failure.getMessage}'")
   }
 
   override protected[core] def count(table: String,
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
index 77962a6..d9f4d66 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
@@ -19,11 +19,14 @@ package org.apache.openwhisk.core.database.cosmosdb
 
 import io.netty.util.ResourceLeakDetector
 import io.netty.util.ResourceLeakDetector.Level
+import org.apache.openwhisk.common.TransactionId
 import org.junit.runner.RunWith
 import org.scalatest.{FlatSpec, Pending}
 import org.scalatest.junit.JUnitRunner
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.database.test.behavior.ArtifactStoreBehavior
+import org.apache.openwhisk.core.entity.WhiskActivation
+import org.apache.openwhisk.core.entity.WhiskEntityQueries.TOP
 
 @RunWith(classOf[JUnitRunner])
 class CosmosDBArtifactStoreTests extends FlatSpec with 
CosmosDBStoreBehaviorBase with ArtifactStoreBehavior {
@@ -80,4 +83,33 @@ class CosmosDBArtifactStoreTests extends FlatSpec with 
CosmosDBStoreBehaviorBase
       }
     }
   }
+
+  behavior of "CosmosDB query debug"
+
+  it should "log query metrics in debug flow" in {
+    val debugTid = TransactionId("42", extraLogging = true)
+    val tid = TransactionId("42")
+    val ns = newNS()
+    val activations = (1000 until 1100 by 10).map(newActivation(ns.asString, 
"testact", _))
+    activations foreach (put(activationStore, _)(tid))
+
+    val entityPath = s"${ns.asString}/testact"
+    stream.reset()
+    query[WhiskActivation](
+      activationStore,
+      WhiskActivation.filtersView.name,
+      List(entityPath, 1050),
+      List(entityPath, TOP, TOP))(tid)
+
+    stream.toString should not include ("[QueryMetricsEnabled]")
+    stream.reset()
+
+    query[WhiskActivation](
+      activationStore,
+      WhiskActivation.filtersView.name,
+      List(entityPath, 1050),
+      List(entityPath, TOP, TOP))(debugTid)
+    stream.toString should include("[QueryMetricsEnabled]")
+
+  }
 }

Reply via email to