This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 72aae3d  Ensure CosmosDBArtifactStore return query results within 
limits (#4061)
72aae3d is described below

commit 72aae3d9d4b2e42847095561fe0f6dd9f397e635
Author: Chetan Mehrotra <chet...@apache.org>
AuthorDate: Wed Oct 10 22:56:47 2018 +0530

    Ensure CosmosDBArtifactStore return query results within limits (#4061)
    
    * Allow tests to reuse existing database instead of creating new db 
everytime
    
    This can be enabled by setting system property 
`whisk.cosmosdb.useExistingDB` to true
    
    * Ensure that limit is properly honored
    
    * Improve the testcase
    
    * Remove asserting that query result size should be 2
    
    Result is of size 2 only for CosmosDB. For other cases only single result 
is returned
    
    * Modify the query also check if key is defined
---
 .../database/cosmosdb/CosmosDBArtifactStore.scala  |  8 ++++-
 .../database/cosmosdb/CosmosDBViewMapper.scala     |  2 +-
 .../database/cosmosdb/CosmosDBTestSupport.scala    | 41 ++++++++++++++++++----
 .../ArtifactStoreSubjectQueryBehaviors.scala       | 33 ++++++++++++++++-
 4 files changed, 74 insertions(+), 10 deletions(-)

diff --git 
a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
 
b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index ca3acba..24dc040 100644
--- 
a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ 
b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -218,7 +218,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
 
     val publisher =
       
RxReactiveStreams.toPublisher(client.queryDocuments(collection.getSelfLink, 
querySpec, newFeedOptions()))
-    val f = Source
+    val s = Source
       .fromPublisher(publisher)
       .wireTap(Sink.foreach(r => collectMetrics(queryToken, 
r.getRequestCharge)))
       .mapConcat(asSeq)
@@ -229,6 +229,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
           .transformViewResult(ddoc, viewName, startKey, endKey, 
realIncludeDocs, js, CosmosDBArtifactStore.this))
       .mapAsync(1)(identity)
       .mapConcat(identity)
+
+    //If limit is specified then only take that many elements. With join its 
possible that
+    //number of entries become > limit
+    val s2 = if (limit > 0) s.take(limit) else s
+
+    val f = s2
       .runWith(Sink.seq)
       .map(_.toList)
 
diff --git 
a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBViewMapper.scala
 
b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBViewMapper.scala
index f3d7353..3a5bd89 100644
--- 
a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBViewMapper.scala
+++ 
b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBViewMapper.scala
@@ -296,7 +296,7 @@ private[cosmosdb] object SubjectViewMapper extends 
CosmosDBViewMapper {
     val notBlocked = s"(NOT(IS_DEFINED(r.$BLOCKED)) OR r.$BLOCKED = false)"
     val (where, params) = startKey match {
       case (ns: String) :: Nil =>
-        (s"$notBlocked AND (r.$SUBJECT = @name OR n.$NAME = @name)", ("@name", 
ns) :: Nil)
+        (s"$notBlocked AND ((r.$SUBJECT = @name AND IS_DEFINED(r.$KEY)) OR 
n.$NAME = @name)", ("@name", ns) :: Nil)
       case (uuid: String) :: (key: String) :: Nil =>
         (
           s"$notBlocked AND ((r.$UUID = @uuid AND r.$KEY = @key) OR (n.$UUID = 
@uuid AND n.$KEY = @key))",
diff --git 
a/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBTestSupport.scala 
b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBTestSupport.scala
index 2743538..f783c79 100644
--- 
a/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBTestSupport.scala
+++ 
b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBTestSupport.scala
@@ -17,7 +17,7 @@
 
 package whisk.core.database.cosmosdb
 
-import com.microsoft.azure.cosmosdb.Database
+import com.microsoft.azure.cosmosdb.{Database, SqlParameter, 
SqlParameterCollection, SqlQuerySpec}
 import org.scalatest.{BeforeAndAfterAll, FlatSpec}
 import pureconfig.loadConfigOrThrow
 import whisk.core.ConfigKeys
@@ -31,6 +31,7 @@ trait CosmosDBTestSupport extends FlatSpec with 
BeforeAndAfterAll with RxObserva
 
   lazy val storeConfigTry = Try { 
loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb) }
   lazy val client = CosmosDBUtil.createClient(storeConfig)
+  val useExistingDB = 
java.lang.Boolean.getBoolean("whisk.cosmosdb.useExistingDB")
 
   def storeConfig = storeConfigTry.get
 
@@ -44,17 +45,43 @@ trait CosmosDBTestSupport extends FlatSpec with 
BeforeAndAfterAll with RxObserva
   }
 
   protected def createTestDB() = {
+    if (useExistingDB) {
+      val db = getOrCreateDatabase()
+      println(s"Using existing database ${db.getId}")
+      db
+    } else {
+      val databaseDefinition = new Database
+      databaseDefinition.setId(generateDBName())
+      val db = client.createDatabase(databaseDefinition, null).blockingResult()
+      dbsToDelete += db
+      println(s"Created database ${db.getId}")
+      db
+    }
+  }
+
+  private def getOrCreateDatabase(): Database = {
+    client
+      .queryDatabases(querySpec(storeConfig.db), null)
+      .blockingOnlyResult()
+      .getOrElse {
+        client.createDatabase(newDatabase, null).blockingResult()
+      }
+  }
+
+  protected def querySpec(id: String) =
+    new SqlQuerySpec("SELECT * FROM root r WHERE r.id=@id", new 
SqlParameterCollection(new SqlParameter("@id", id)))
+
+  private def newDatabase = {
     val databaseDefinition = new Database
-    databaseDefinition.setId(generateDBName())
-    val db = client.createDatabase(databaseDefinition, null).blockingResult()
-    dbsToDelete += db
-    println(s"Credted database ${db.getId}")
-    db
+    databaseDefinition.setId(storeConfig.db)
+    databaseDefinition
   }
 
   override def afterAll(): Unit = {
     super.afterAll()
-    dbsToDelete.foreach(db => client.deleteDatabase(db.getSelfLink, 
null).blockingResult())
+    if (!useExistingDB) {
+      dbsToDelete.foreach(db => client.deleteDatabase(db.getSelfLink, 
null).blockingResult())
+    }
     client.close()
   }
 }
diff --git 
a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreSubjectQueryBehaviors.scala
 
b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreSubjectQueryBehaviors.scala
index 06150d5..5b34ec0 100644
--- 
a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreSubjectQueryBehaviors.scala
+++ 
b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreSubjectQueryBehaviors.scala
@@ -17,11 +17,12 @@
 
 package whisk.core.database.test.behavior
 
-import spray.json.{JsBoolean, JsObject}
+import spray.json.{JsBoolean, JsObject, JsString}
 import whisk.common.TransactionId
 import whisk.core.database.NoDocumentException
 import whisk.core.entity._
 import whisk.core.invoker.NamespaceBlacklist
+import whisk.utils.JsHelpers
 
 trait ArtifactStoreSubjectQueryBehaviors extends ArtifactStoreBehaviorBase {
 
@@ -81,6 +82,36 @@ trait ArtifactStoreSubjectQueryBehaviors extends 
ArtifactStoreBehaviorBase {
     Identity.get(authStore, ak1).failed.futureValue shouldBe 
a[NoDocumentException]
   }
 
+  it should "find subject having multiple namespaces" in {
+    implicit val tid: TransactionId = transid()
+    val uuid1 = UUID()
+    val uuid2 = UUID()
+    val ak1 = BasicAuthenticationAuthKey(uuid1, Secret())
+    val ak2 = BasicAuthenticationAuthKey(uuid2, Secret())
+    val ns1 = Namespace(aname(), uuid1)
+    val ns2 = Namespace(aname(), uuid2)
+
+    val auth = WhiskAuth(
+      Subject(ns1.name.name),
+      Set(
+        WhiskNamespace(ns1, BasicAuthenticationAuthKey(ak1.uuid, ak1.key)),
+        WhiskNamespace(ns2, BasicAuthenticationAuthKey(ak2.uuid, ak2.key))))
+
+    put(authStore, auth)
+
+    waitOnView(authStore, BasicAuthenticationAuthKey(ak1.uuid, ak1.key), 1)
+
+    val i1 = Identity.get(authStore, ns1.name).futureValue
+    i1.subject shouldBe auth.subject
+    i1.namespace shouldBe ns1
+
+    //Also check if all results returned match the provided namespace
+    val seq = Identity.list(authStore, List(ns1.name.asString), limit = 
100).futureValue
+    seq.foreach { js =>
+      JsHelpers.getFieldPath(js, "value", "namespace").get shouldBe 
JsString(i1.namespace.name.asString)
+    }
+  }
+
   it should "find subject by namespace with limits" in {
     implicit val tid: TransactionId = transid()
     val uuid1 = UUID()

Reply via email to