This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 72aae3d Ensure CosmosDBArtifactStore return query results within
limits (#4061)
72aae3d is described below
commit 72aae3d9d4b2e42847095561fe0f6dd9f397e635
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Wed Oct 10 22:56:47 2018 +0530
Ensure CosmosDBArtifactStore return query results within limits (#4061)
* Allow tests to reuse existing database instead of creating new db
everytime
This can be enabled by setting system property
`whisk.cosmosdb.useExistingDB` to true
* Ensure that limit is properly honored
* Improve the testcase
* Remove asserting that query result size should be 2
Result is of size 2 only for CosmosDB. For other cases only single result
is returned
* Modify the query also check if key is defined
---
.../database/cosmosdb/CosmosDBArtifactStore.scala | 8 ++++-
.../database/cosmosdb/CosmosDBViewMapper.scala | 2 +-
.../database/cosmosdb/CosmosDBTestSupport.scala | 41 ++++++++++++++++++----
.../ArtifactStoreSubjectQueryBehaviors.scala | 33 ++++++++++++++++-
4 files changed, 74 insertions(+), 10 deletions(-)
diff --git
a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index ca3acba..24dc040 100644
---
a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++
b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -218,7 +218,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
val publisher =
RxReactiveStreams.toPublisher(client.queryDocuments(collection.getSelfLink,
querySpec, newFeedOptions()))
- val f = Source
+ val s = Source
.fromPublisher(publisher)
.wireTap(Sink.foreach(r => collectMetrics(queryToken,
r.getRequestCharge)))
.mapConcat(asSeq)
@@ -229,6 +229,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
.transformViewResult(ddoc, viewName, startKey, endKey,
realIncludeDocs, js, CosmosDBArtifactStore.this))
.mapAsync(1)(identity)
.mapConcat(identity)
+
+ //If limit is specified then only take that many elements. With join its
possible that
+ //number of entries become > limit
+ val s2 = if (limit > 0) s.take(limit) else s
+
+ val f = s2
.runWith(Sink.seq)
.map(_.toList)
diff --git
a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBViewMapper.scala
b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBViewMapper.scala
index f3d7353..3a5bd89 100644
---
a/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBViewMapper.scala
+++
b/common/scala/src/main/scala/whisk/core/database/cosmosdb/CosmosDBViewMapper.scala
@@ -296,7 +296,7 @@ private[cosmosdb] object SubjectViewMapper extends
CosmosDBViewMapper {
val notBlocked = s"(NOT(IS_DEFINED(r.$BLOCKED)) OR r.$BLOCKED = false)"
val (where, params) = startKey match {
case (ns: String) :: Nil =>
- (s"$notBlocked AND (r.$SUBJECT = @name OR n.$NAME = @name)", ("@name",
ns) :: Nil)
+ (s"$notBlocked AND ((r.$SUBJECT = @name AND IS_DEFINED(r.$KEY)) OR
n.$NAME = @name)", ("@name", ns) :: Nil)
case (uuid: String) :: (key: String) :: Nil =>
(
s"$notBlocked AND ((r.$UUID = @uuid AND r.$KEY = @key) OR (n.$UUID =
@uuid AND n.$KEY = @key))",
diff --git
a/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBTestSupport.scala
b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBTestSupport.scala
index 2743538..f783c79 100644
---
a/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBTestSupport.scala
+++
b/tests/src/test/scala/whisk/core/database/cosmosdb/CosmosDBTestSupport.scala
@@ -17,7 +17,7 @@
package whisk.core.database.cosmosdb
-import com.microsoft.azure.cosmosdb.Database
+import com.microsoft.azure.cosmosdb.{Database, SqlParameter,
SqlParameterCollection, SqlQuerySpec}
import org.scalatest.{BeforeAndAfterAll, FlatSpec}
import pureconfig.loadConfigOrThrow
import whisk.core.ConfigKeys
@@ -31,6 +31,7 @@ trait CosmosDBTestSupport extends FlatSpec with
BeforeAndAfterAll with RxObserva
lazy val storeConfigTry = Try {
loadConfigOrThrow[CosmosDBConfig](ConfigKeys.cosmosdb) }
lazy val client = CosmosDBUtil.createClient(storeConfig)
+ val useExistingDB =
java.lang.Boolean.getBoolean("whisk.cosmosdb.useExistingDB")
def storeConfig = storeConfigTry.get
@@ -44,17 +45,43 @@ trait CosmosDBTestSupport extends FlatSpec with
BeforeAndAfterAll with RxObserva
}
protected def createTestDB() = {
+ if (useExistingDB) {
+ val db = getOrCreateDatabase()
+ println(s"Using existing database ${db.getId}")
+ db
+ } else {
+ val databaseDefinition = new Database
+ databaseDefinition.setId(generateDBName())
+ val db = client.createDatabase(databaseDefinition, null).blockingResult()
+ dbsToDelete += db
+ println(s"Created database ${db.getId}")
+ db
+ }
+ }
+
+ private def getOrCreateDatabase(): Database = {
+ client
+ .queryDatabases(querySpec(storeConfig.db), null)
+ .blockingOnlyResult()
+ .getOrElse {
+ client.createDatabase(newDatabase, null).blockingResult()
+ }
+ }
+
+ protected def querySpec(id: String) =
+ new SqlQuerySpec("SELECT * FROM root r WHERE r.id=@id", new
SqlParameterCollection(new SqlParameter("@id", id)))
+
+ private def newDatabase = {
val databaseDefinition = new Database
- databaseDefinition.setId(generateDBName())
- val db = client.createDatabase(databaseDefinition, null).blockingResult()
- dbsToDelete += db
- println(s"Credted database ${db.getId}")
- db
+ databaseDefinition.setId(storeConfig.db)
+ databaseDefinition
}
override def afterAll(): Unit = {
super.afterAll()
- dbsToDelete.foreach(db => client.deleteDatabase(db.getSelfLink,
null).blockingResult())
+ if (!useExistingDB) {
+ dbsToDelete.foreach(db => client.deleteDatabase(db.getSelfLink,
null).blockingResult())
+ }
client.close()
}
}
diff --git
a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreSubjectQueryBehaviors.scala
b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreSubjectQueryBehaviors.scala
index 06150d5..5b34ec0 100644
---
a/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreSubjectQueryBehaviors.scala
+++
b/tests/src/test/scala/whisk/core/database/test/behavior/ArtifactStoreSubjectQueryBehaviors.scala
@@ -17,11 +17,12 @@
package whisk.core.database.test.behavior
-import spray.json.{JsBoolean, JsObject}
+import spray.json.{JsBoolean, JsObject, JsString}
import whisk.common.TransactionId
import whisk.core.database.NoDocumentException
import whisk.core.entity._
import whisk.core.invoker.NamespaceBlacklist
+import whisk.utils.JsHelpers
trait ArtifactStoreSubjectQueryBehaviors extends ArtifactStoreBehaviorBase {
@@ -81,6 +82,36 @@ trait ArtifactStoreSubjectQueryBehaviors extends
ArtifactStoreBehaviorBase {
Identity.get(authStore, ak1).failed.futureValue shouldBe
a[NoDocumentException]
}
+ it should "find subject having multiple namespaces" in {
+ implicit val tid: TransactionId = transid()
+ val uuid1 = UUID()
+ val uuid2 = UUID()
+ val ak1 = BasicAuthenticationAuthKey(uuid1, Secret())
+ val ak2 = BasicAuthenticationAuthKey(uuid2, Secret())
+ val ns1 = Namespace(aname(), uuid1)
+ val ns2 = Namespace(aname(), uuid2)
+
+ val auth = WhiskAuth(
+ Subject(ns1.name.name),
+ Set(
+ WhiskNamespace(ns1, BasicAuthenticationAuthKey(ak1.uuid, ak1.key)),
+ WhiskNamespace(ns2, BasicAuthenticationAuthKey(ak2.uuid, ak2.key))))
+
+ put(authStore, auth)
+
+ waitOnView(authStore, BasicAuthenticationAuthKey(ak1.uuid, ak1.key), 1)
+
+ val i1 = Identity.get(authStore, ns1.name).futureValue
+ i1.subject shouldBe auth.subject
+ i1.namespace shouldBe ns1
+
+ //Also check if all results returned match the provided namespace
+ val seq = Identity.list(authStore, List(ns1.name.asString), limit =
100).futureValue
+ seq.foreach { js =>
+ JsHelpers.getFieldPath(js, "value", "namespace").get shouldBe
JsString(i1.namespace.name.asString)
+ }
+ }
+
it should "find subject by namespace with limits" in {
implicit val tid: TransactionId = transid()
val uuid1 = UUID()