This is an automated email from the ASF dual-hosted git repository.
chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 5b28f46 Record clusterId performing change in CosmosDB (#4312)
5b28f46 is described below
commit 5b28f46bf6858183e10f33a3eb408e84d4ec9a95
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Mon Mar 4 17:21:06 2019 +0530
Record clusterId performing change in CosmosDB (#4312)
Records the clusterId (if configured) with the updated document while
persisting in ComosDB. This is an optional property. By default no `clusterId`
info is saved
---
common/scala/src/main/resources/application.conf | 4 ++
.../database/cosmosdb/CosmosDBArtifactStore.scala | 22 ++++++++++-
.../core/database/cosmosdb/CosmosDBConfig.scala | 3 +-
.../core/database/cosmosdb/CosmosDBUtil.scala | 10 +++++
.../cosmosdb/CosmosDBArtifactStoreTests.scala | 46 +++++++++++++++++++---
.../database/cosmosdb/CosmosDBConfigTests.scala | 12 ++++--
.../cosmosdb/CosmosDBStoreBehaviorBase.scala | 2 +-
7 files changed, 87 insertions(+), 12 deletions(-)
diff --git a/common/scala/src/main/resources/application.conf
b/common/scala/src/main/resources/application.conf
index 1cf474f..f73d8ec 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -197,6 +197,10 @@ whisk {
# ENABLING THIS VALUE MEANS YOUR DATA WILL NOT BE PERMANENTLY STORED
# Can only be used for `WhiskActivation` for now
# time-to-live = 60 s
+
+ # Specifies the current clusterId whose value is recorded with
document upon any update
+ # to indicate which cluster made the change. By default no such value
is recorded
+ # cluster-id =
connection-policy {
max-pool-size = 1000
# When the value of this property is true, the SDK will direct
write operations to
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index 3d08991..79bd179 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -75,12 +75,13 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
private val queryToken = createToken("query")
private val countToken = createToken("count")
private val putAttachmentToken = createToken("putAttachment", read = false)
+ private val clusterIdValue = config.clusterId.map(JsString(_))
logging.info(
this,
s"Initializing CosmosDBArtifactStore for collection [$collName]. Service
endpoint [${client.getServiceEndpoint}], " +
s"Read endpoint [${client.getReadEndpoint}], Write endpoint
[${client.getWriteEndpoint}], Connection Policy
[${client.getConnectionPolicy}], " +
- s"Time to live [${collection.getDefaultTimeToLive} secs")
+ s"Time to live [${collection.getDefaultTimeToLive} secs, clusterId
[${config.clusterId}]")
//Clone the returned instance as these are mutable
def documentCollection(): DocumentCollection = new
DocumentCollection(collection.toJson)
@@ -201,6 +202,22 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
failure => s"[GET_BY_ID] '$collName' internal error, doc: '$id',
failure: '${failure.getMessage}'")
}
+ /**
+ * Method exposed for test cases to access the raw json returned by CosmosDB
+ */
+ private[cosmosdb] def getRaw(id: DocId): Future[Option[JsObject]] = {
+ client
+ .readDocument(selfLinkOf(id), newRequestOption(id))
+ .head()
+ .map { rr =>
+ val js = rr.getResource.toJson.parseJson.asJsObject
+ Some(js)
+ }
+ .recoverWith {
+ case e: DocumentClientException if isNotFound(e) =>
Future.successful(None)
+ }
+ }
+
override protected[core] def query(table: String,
startKey: List[Any],
endKey: List[Any],
@@ -464,7 +481,8 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
Seq(
(cid, Some(JsString(escapeId(json.fields(_id).convertTo[String])))),
(etag, json.fields.get(_rev)),
- (computed, computedOpt))
+ (computed, computedOpt),
+ (clusterId, clusterIdValue))
val fieldsToRemove = Seq(_id, _rev)
val mapped = transform(json, fieldsToAdd, fieldsToRemove)
val doc = new Document(mapped.compactPrint)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
index 564b0f1..a7e40ae 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
@@ -37,7 +37,8 @@ case class CosmosDBConfig(endpoint: String,
throughput: Int,
consistencyLevel: ConsistencyLevel,
connectionPolicy: ConnectionPolicy,
- timeToLive: Option[Duration]) {
+ timeToLive: Option[Duration],
+ clusterId: Option[String]) {
def createClient(): AsyncDocumentClient = {
new AsyncDocumentClient.Builder()
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
index c4237ba..1fa74d2 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
@@ -23,6 +23,10 @@ import
org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants._
import scala.collection.immutable.Iterable
private[cosmosdb] object CosmosDBConstants {
+
+ /**
+ * Stores the computed properties required for view related queries
+ */
val computed: String = "_c"
val alias: String = "view"
@@ -34,6 +38,12 @@ private[cosmosdb] object CosmosDBConstants {
val aggregate: String = AGGREGATE
val selfLink: String = SELF_LINK
+
+ /**
+ * Records the clusterId which performed changed in any document. This can
vary over
+ * lifetime of a document as different clusters may change the same document
at different times
+ */
+ val clusterId: String = "_clusterId"
}
private[cosmosdb] trait CosmosDBUtil {
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
index d9f4d66..9898317 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
@@ -17,16 +17,25 @@
package org.apache.openwhisk.core.database.cosmosdb
+import com.typesafe.config.ConfigFactory
import io.netty.util.ResourceLeakDetector
import io.netty.util.ResourceLeakDetector.Level
import org.apache.openwhisk.common.TransactionId
-import org.junit.runner.RunWith
-import org.scalatest.{FlatSpec, Pending}
-import org.scalatest.junit.JUnitRunner
-import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.database.test.behavior.ArtifactStoreBehavior
-import org.apache.openwhisk.core.entity.WhiskActivation
import org.apache.openwhisk.core.entity.WhiskEntityQueries.TOP
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.{
+ DocumentReader,
+ WhiskActivation,
+ WhiskDocumentReader,
+ WhiskEntity,
+ WhiskEntityJsonFormat,
+ WhiskPackage
+}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Pending}
+import spray.json.JsString
@RunWith(classOf[JUnitRunner])
class CosmosDBArtifactStoreTests extends FlatSpec with
CosmosDBStoreBehaviorBase with ArtifactStoreBehavior {
@@ -84,6 +93,33 @@ class CosmosDBArtifactStoreTests extends FlatSpec with
CosmosDBStoreBehaviorBase
}
}
+ it should "have clusterId set" in {
+ implicit val tid: TransactionId = TransactionId.testing
+ implicit val docReader: DocumentReader = WhiskDocumentReader
+ implicit val format = WhiskEntityJsonFormat
+ val conf = ConfigFactory.parseString(s"""
+ | whisk.cosmosdb {
+ | collections {
+ | WhiskEntity = {
+ | cluster-id = "foo"
+ | }
+ | }
+ | }
+ """.stripMargin).withFallback(ConfigFactory.load())
+
+ val cosmosDBConfig = CosmosDBConfig(conf, "WhiskEntity")
+ cosmosDBConfig.clusterId shouldBe Some("foo")
+
+ val testConfig = cosmosDBConfig.copy(db = config.db)
+ val store =
CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](testConfig, None)
+
+ val pkg = WhiskPackage(newNS(), aname())
+ val info = put(store, pkg)
+
+ val js = store.getRaw(info.id).futureValue
+ js.get.fields(CosmosDBConstants.clusterId) shouldBe JsString("foo")
+ }
+
behavior of "CosmosDB query debug"
it should "log query metrics in debug flow" in {
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
index 94c36bd..979312e 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
@@ -66,7 +66,9 @@ class CosmosDBConfigTests extends FlatSpec with Matchers {
| }
""".stripMargin).withFallback(globalConfig)
val cosmos = CosmosDBConfig(config, "WhiskAuth")
- cosmos should matchPattern { case CosmosDBConfig("http://localhost",
"foo", "openwhisk", _, _, _, _) => }
+ cosmos.endpoint shouldBe "http://localhost"
+ cosmos.key shouldBe "foo"
+ cosmos.db shouldBe "openwhisk"
}
it should "work with extended config" in {
@@ -81,7 +83,9 @@ class CosmosDBConfigTests extends FlatSpec with Matchers {
| }
""".stripMargin).withFallback(globalConfig)
val cosmos = CosmosDBConfig(config, "WhiskAuth")
- cosmos should matchPattern { case CosmosDBConfig("http://localhost",
"foo", "openwhisk", _, _, _, _) => }
+ cosmos.endpoint shouldBe "http://localhost"
+ cosmos.key shouldBe "foo"
+ cosmos.db shouldBe "openwhisk"
cosmos.connectionPolicy.maxPoolSize shouldBe 42
val policy = cosmos.connectionPolicy.asJava
@@ -115,7 +119,9 @@ class CosmosDBConfigTests extends FlatSpec with Matchers {
| }
""".stripMargin).withFallback(globalConfig)
val cosmos = CosmosDBConfig(config, "WhiskAuth")
- cosmos should matchPattern { case CosmosDBConfig("http://localhost",
"foo", "openwhisk", _, _, _, _) => }
+ cosmos.endpoint shouldBe "http://localhost"
+ cosmos.key shouldBe "foo"
+ cosmos.db shouldBe "openwhisk"
val policy = cosmos.connectionPolicy.asJava
policy.isUsingMultipleWriteLocations shouldBe true
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
index 1dc4ed2..18fbf29 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
@@ -37,7 +37,7 @@ trait CosmosDBStoreBehaviorBase extends FlatSpec with
ArtifactStoreBehaviorBase
override lazy val storeAvailableCheck: Try[Any] = storeConfigTry
- private lazy val config: CosmosDBConfig = storeConfig.copy(db =
createTestDB().getId)
+ protected lazy val config: CosmosDBConfig = storeConfig.copy(db =
createTestDB().getId)
override lazy val authStore = {
implicit val docReader: DocumentReader = WhiskDocumentReader