This is an automated email from the ASF dual-hosted git repository. tysonnorris pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 72aae3d Ensure CosmosDBArtifactStore return query results within limits (#4061) 72aae3d is described below commit 72aae3d9d4b2e42847095561fe0f6dd9f397e635 Author: Chetan Mehrotra <chet...@apache.org> AuthorDate: Wed Oct 10 22:56:47 2018 +0530 Ensure CosmosDBArtifactStore return query results within limits (#4061) * Allow tests to reuse existing database instead of creating new db everytime This can be enabled by setting system property `whisk.cosmosdb.useExistingDB` to true * Ensure that limit is properly honored * Improve the testcase * Remove asserting that query result size should be 2 Result is of size 2 only for CosmosDB. For other cases only single result is returned * Modify the query also check if key is defined --- .../database/cosmosdb/CosmosDBArtifactStore.scala | 8 ++++- .../database/cosmosdb/CosmosDBViewMapper.scala | 2 +- .../database/cosmosdb/CosmosDBTestSupport.scala | 41 ++++++++++++++++++---- .../ArtifactStoreSubjectQueryBehaviors.scala | 33 ++++++++++++++++- 4 files changed, 74 insertions(+), 10 deletions(-) diff --git a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala index ca3acba..24dc040 100644 --- a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala +++ b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala @@ -218,7 +218,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected val publisher = RxReactiveStreams.toPublisher(client.queryDocuments(collection.getSelfLink, querySpec, newFeedOptions())) - val f = Source + val s = Source .fromPublisher(publisher) .wireTap(Sink.foreach(r => collectMetrics(queryToken, r.getRequestCharge))) .mapConcat(asSeq) @@ -229,6 +229,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <: DocumentSerializer](protected .transformViewResult(ddoc, viewName, startKey, endKey, realIncludeDocs, js, CosmosDBArtifactStore.this)) .mapAsync(1)(identity) .mapConcat(identity) + + //If limit is specified then only take that many elements. With join its possible that + //number of entries become > limit + val s2 = if (limit > 0) s.take(limit) else s + + val f = s2 .runWith(Sink.seq) .map(_.toList) diff --git a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBViewMapper.scala b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBViewMapper.scala index f3d7353..3a5bd89 100644 --- a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBViewMapper.scala +++ b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBViewMapper.scala @@ -296,7 +296,7 @@ private[cosmosdb] object SubjectViewMapper extends CosmosDBViewMapper { val notBlocked = s"(NOT(IS_DEFINED(r.$BLOCKED)) OR r.$BLOCKED = false)" val (where, params) = startKey match { case (ns: String) :: Nil => - (s"$notBlocked AND (r.$SUBJECT = @name OR n.$NAME = @name)", ("@name", ns) :: Nil) + (s"$notBlocked AND ((r.$SUBJECT = @name AND IS_DEFINED(r.$KEY)) OR n.$NAME = @name)", ("@name", ns) :: Nil) case (uuid: String) :: (key: String) :: Nil => ( s"$notBlocked AND ((r.$UUID = @uuid AND r.$KEY = @key) OR (n.$UUID = @uuid AND n.$KEY = @key))", diff --git a/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBTestSupport.scala b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBTestSupport.scala index 2743538..f783c79 100644 --- a/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBTestSupport.scala +++ b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBTestSupport.scala @@ -17,7 +17,7 @@ package whisk.core.database.cosmosdb -import com.microsoft.azure.cosmosdb.Database +import com.microsoft.azure.cosmosdb.{Database, SqlParameter, SqlParameterCollection, SqlQuerySpec} import org.scalatest.{BeforeAndAfterAll, FlatSpec} import pureconfig.loadConfigOrThrow import whisk.core.ConfigKeys @@ -31,6 +31,7 @@ trait CosmosDBTestSupport extends FlatSpec with BeforeAndAfterAll with RxObserva lazy val storeConfigTry = Try { loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb) } lazy val client = CosmosDBUtil.createClient(storeConfig) + val useExistingDB = java.lang.Boolean.getBoolean("whisk.cosmosdb.useExistingDB") def storeConfig = storeConfigTry.get @@ -44,17 +45,43 @@ trait CosmosDBTestSupport extends FlatSpec with BeforeAndAfterAll with RxObserva } protected def createTestDB() = { + if (useExistingDB) { + val db = getOrCreateDatabase() + println(s"Using existing database ${db.getId}") + db + } else { + val databaseDefinition = new Database + databaseDefinition.setId(generateDBName()) + val db = client.createDatabase(databaseDefinition, null).blockingResult() + dbsToDelete += db + println(s"Created database ${db.getId}") + db + } + } + + private def getOrCreateDatabase(): Database = { + client + .queryDatabases(querySpec(storeConfig.db), null) + .blockingOnlyResult() + .getOrElse { + client.createDatabase(newDatabase, null).blockingResult() + } + } + + protected def querySpec(id: String) = + new SqlQuerySpec("SELECT * FROM root r WHERE r.id=@id", new SqlParameterCollection(new SqlParameter("@id", id))) + + private def newDatabase = { val databaseDefinition = new Database - databaseDefinition.setId(generateDBName()) - val db = client.createDatabase(databaseDefinition, null).blockingResult() - dbsToDelete += db - println(s"Credted database ${db.getId}") - db + databaseDefinition.setId(storeConfig.db) + databaseDefinition } override def afterAll(): Unit = { super.afterAll() - dbsToDelete.foreach(db => client.deleteDatabase(db.getSelfLink, null).blockingResult()) + if (!useExistingDB) { + dbsToDelete.foreach(db => client.deleteDatabase(db.getSelfLink, null).blockingResult()) + } client.close() } } diff --git a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreSubjectQueryBehaviors.scala b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreSubjectQueryBehaviors.scala index 06150d5..5b34ec0 100644 --- a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreSubjectQueryBehaviors.scala +++ b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreSubjectQueryBehaviors.scala @@ -17,11 +17,12 @@ package whisk.core.database.test.behavior -import spray.json.{JsBoolean, JsObject} +import spray.json.{JsBoolean, JsObject, JsString} import whisk.common.TransactionId import whisk.core.database.NoDocumentException import whisk.core.entity._ import whisk.core.invoker.NamespaceBlacklist +import whisk.utils.JsHelpers trait ArtifactStoreSubjectQueryBehaviors extends ArtifactStoreBehaviorBase { @@ -81,6 +82,36 @@ trait ArtifactStoreSubjectQueryBehaviors extends ArtifactStoreBehaviorBase { Identity.get(authStore, ak1).failed.futureValue shouldBe a[NoDocumentException] } + it should "find subject having multiple namespaces" in { + implicit val tid: TransactionId = transid() + val uuid1 = UUID() + val uuid2 = UUID() + val ak1 = BasicAuthenticationAuthKey(uuid1, Secret()) + val ak2 = BasicAuthenticationAuthKey(uuid2, Secret()) + val ns1 = Namespace(aname(), uuid1) + val ns2 = Namespace(aname(), uuid2) + + val auth = WhiskAuth( + Subject(ns1.name.name), + Set( + WhiskNamespace(ns1, BasicAuthenticationAuthKey(ak1.uuid, ak1.key)), + WhiskNamespace(ns2, BasicAuthenticationAuthKey(ak2.uuid, ak2.key)))) + + put(authStore, auth) + + waitOnView(authStore, BasicAuthenticationAuthKey(ak1.uuid, ak1.key), 1) + + val i1 = Identity.get(authStore, ns1.name).futureValue + i1.subject shouldBe auth.subject + i1.namespace shouldBe ns1 + + //Also check if all results returned match the provided namespace + val seq = Identity.list(authStore, List(ns1.name.asString), limit = 100).futureValue + seq.foreach { js => + JsHelpers.getFieldPath(js, "value", "namespace").get shouldBe JsString(i1.namespace.name.asString) + } + } + it should "find subject by namespace with limits" in { implicit val tid: TransactionId = transid() val uuid1 = UUID()