This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b28f46  Record clusterId performing change in CosmosDB (#4312)
5b28f46 is described below

commit 5b28f46bf6858183e10f33a3eb408e84d4ec9a95
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Mon Mar 4 17:21:06 2019 +0530

    Record clusterId performing change in CosmosDB (#4312)
    
    Records the clusterId (if configured) with the updated document while 
persisting in ComosDB. This is an optional property. By default no `clusterId` 
info is saved
---
 common/scala/src/main/resources/application.conf   |  4 ++
 .../database/cosmosdb/CosmosDBArtifactStore.scala  | 22 ++++++++++-
 .../core/database/cosmosdb/CosmosDBConfig.scala    |  3 +-
 .../core/database/cosmosdb/CosmosDBUtil.scala      | 10 +++++
 .../cosmosdb/CosmosDBArtifactStoreTests.scala      | 46 +++++++++++++++++++---
 .../database/cosmosdb/CosmosDBConfigTests.scala    | 12 ++++--
 .../cosmosdb/CosmosDBStoreBehaviorBase.scala       |  2 +-
 7 files changed, 87 insertions(+), 12 deletions(-)

diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index 1cf474f..f73d8ec 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -197,6 +197,10 @@ whisk {
         # ENABLING THIS VALUE MEANS YOUR DATA WILL NOT BE PERMANENTLY STORED
         # Can only be used for `WhiskActivation` for now
         # time-to-live      = 60 s
+
+        # Specifies the current clusterId whose value is recorded with 
document upon any update
+        # to indicate which cluster made the change. By default no such value 
is recorded
+        # cluster-id        =
         connection-policy {
             max-pool-size = 1000
             # When the value of this property is true, the SDK will direct 
write operations to
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index 3d08991..79bd179 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -75,12 +75,13 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
   private val queryToken = createToken("query")
   private val countToken = createToken("count")
   private val putAttachmentToken = createToken("putAttachment", read = false)
+  private val clusterIdValue = config.clusterId.map(JsString(_))
 
   logging.info(
     this,
     s"Initializing CosmosDBArtifactStore for collection [$collName]. Service 
endpoint [${client.getServiceEndpoint}], " +
       s"Read endpoint [${client.getReadEndpoint}], Write endpoint 
[${client.getWriteEndpoint}], Connection Policy 
[${client.getConnectionPolicy}], " +
-      s"Time to live [${collection.getDefaultTimeToLive} secs")
+      s"Time to live [${collection.getDefaultTimeToLive} secs, clusterId 
[${config.clusterId}]")
 
   //Clone the returned instance as these are mutable
   def documentCollection(): DocumentCollection = new 
DocumentCollection(collection.toJson)
@@ -201,6 +202,22 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
       failure => s"[GET_BY_ID] '$collName' internal error, doc: '$id', 
failure: '${failure.getMessage}'")
   }
 
+  /**
+   * Method exposed for test cases to access the raw json returned by CosmosDB
+   */
+  private[cosmosdb] def getRaw(id: DocId): Future[Option[JsObject]] = {
+    client
+      .readDocument(selfLinkOf(id), newRequestOption(id))
+      .head()
+      .map { rr =>
+        val js = rr.getResource.toJson.parseJson.asJsObject
+        Some(js)
+      }
+      .recoverWith {
+        case e: DocumentClientException if isNotFound(e) => 
Future.successful(None)
+      }
+  }
+
   override protected[core] def query(table: String,
                                      startKey: List[Any],
                                      endKey: List[Any],
@@ -464,7 +481,8 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
       Seq(
         (cid, Some(JsString(escapeId(json.fields(_id).convertTo[String])))),
         (etag, json.fields.get(_rev)),
-        (computed, computedOpt))
+        (computed, computedOpt),
+        (clusterId, clusterIdValue))
     val fieldsToRemove = Seq(_id, _rev)
     val mapped = transform(json, fieldsToAdd, fieldsToRemove)
     val doc = new Document(mapped.compactPrint)
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
index 564b0f1..a7e40ae 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
@@ -37,7 +37,8 @@ case class CosmosDBConfig(endpoint: String,
                           throughput: Int,
                           consistencyLevel: ConsistencyLevel,
                           connectionPolicy: ConnectionPolicy,
-                          timeToLive: Option[Duration]) {
+                          timeToLive: Option[Duration],
+                          clusterId: Option[String]) {
 
   def createClient(): AsyncDocumentClient = {
     new AsyncDocumentClient.Builder()
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
index c4237ba..1fa74d2 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
@@ -23,6 +23,10 @@ import 
org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants._
 import scala.collection.immutable.Iterable
 
 private[cosmosdb] object CosmosDBConstants {
+
+  /**
+   * Stores the computed properties required for view related queries
+   */
   val computed: String = "_c"
 
   val alias: String = "view"
@@ -34,6 +38,12 @@ private[cosmosdb] object CosmosDBConstants {
   val aggregate: String = AGGREGATE
 
   val selfLink: String = SELF_LINK
+
+  /**
+   * Records the clusterId which performed changed in any document. This can 
vary over
+   * lifetime of a document as different clusters may change the same document 
at different times
+   */
+  val clusterId: String = "_clusterId"
 }
 
 private[cosmosdb] trait CosmosDBUtil {
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
index d9f4d66..9898317 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
@@ -17,16 +17,25 @@
 
 package org.apache.openwhisk.core.database.cosmosdb
 
+import com.typesafe.config.ConfigFactory
 import io.netty.util.ResourceLeakDetector
 import io.netty.util.ResourceLeakDetector.Level
 import org.apache.openwhisk.common.TransactionId
-import org.junit.runner.RunWith
-import org.scalatest.{FlatSpec, Pending}
-import org.scalatest.junit.JUnitRunner
-import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.database.test.behavior.ArtifactStoreBehavior
-import org.apache.openwhisk.core.entity.WhiskActivation
 import org.apache.openwhisk.core.entity.WhiskEntityQueries.TOP
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.{
+  DocumentReader,
+  WhiskActivation,
+  WhiskDocumentReader,
+  WhiskEntity,
+  WhiskEntityJsonFormat,
+  WhiskPackage
+}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Pending}
+import spray.json.JsString
 
 @RunWith(classOf[JUnitRunner])
 class CosmosDBArtifactStoreTests extends FlatSpec with 
CosmosDBStoreBehaviorBase with ArtifactStoreBehavior {
@@ -84,6 +93,33 @@ class CosmosDBArtifactStoreTests extends FlatSpec with 
CosmosDBStoreBehaviorBase
     }
   }
 
+  it should "have clusterId set" in {
+    implicit val tid: TransactionId = TransactionId.testing
+    implicit val docReader: DocumentReader = WhiskDocumentReader
+    implicit val format = WhiskEntityJsonFormat
+    val conf = ConfigFactory.parseString(s"""
+      | whisk.cosmosdb {
+      |  collections {
+      |     WhiskEntity = {
+      |        cluster-id = "foo"
+      |     }
+      |  }
+      | }
+         """.stripMargin).withFallback(ConfigFactory.load())
+
+    val cosmosDBConfig = CosmosDBConfig(conf, "WhiskEntity")
+    cosmosDBConfig.clusterId shouldBe Some("foo")
+
+    val testConfig = cosmosDBConfig.copy(db = config.db)
+    val store = 
CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](testConfig, None)
+
+    val pkg = WhiskPackage(newNS(), aname())
+    val info = put(store, pkg)
+
+    val js = store.getRaw(info.id).futureValue
+    js.get.fields(CosmosDBConstants.clusterId) shouldBe JsString("foo")
+  }
+
   behavior of "CosmosDB query debug"
 
   it should "log query metrics in debug flow" in {
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
index 94c36bd..979312e 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
@@ -66,7 +66,9 @@ class CosmosDBConfigTests extends FlatSpec with Matchers {
       | }
          """.stripMargin).withFallback(globalConfig)
     val cosmos = CosmosDBConfig(config, "WhiskAuth")
-    cosmos should matchPattern { case CosmosDBConfig("http://localhost";, 
"foo", "openwhisk", _, _, _, _) => }
+    cosmos.endpoint shouldBe "http://localhost";
+    cosmos.key shouldBe "foo"
+    cosmos.db shouldBe "openwhisk"
   }
 
   it should "work with extended config" in {
@@ -81,7 +83,9 @@ class CosmosDBConfigTests extends FlatSpec with Matchers {
       | }
          """.stripMargin).withFallback(globalConfig)
     val cosmos = CosmosDBConfig(config, "WhiskAuth")
-    cosmos should matchPattern { case CosmosDBConfig("http://localhost";, 
"foo", "openwhisk", _, _, _, _) => }
+    cosmos.endpoint shouldBe "http://localhost";
+    cosmos.key shouldBe "foo"
+    cosmos.db shouldBe "openwhisk"
 
     cosmos.connectionPolicy.maxPoolSize shouldBe 42
     val policy = cosmos.connectionPolicy.asJava
@@ -115,7 +119,9 @@ class CosmosDBConfigTests extends FlatSpec with Matchers {
       | }
          """.stripMargin).withFallback(globalConfig)
     val cosmos = CosmosDBConfig(config, "WhiskAuth")
-    cosmos should matchPattern { case CosmosDBConfig("http://localhost";, 
"foo", "openwhisk", _, _, _, _) => }
+    cosmos.endpoint shouldBe "http://localhost";
+    cosmos.key shouldBe "foo"
+    cosmos.db shouldBe "openwhisk"
 
     val policy = cosmos.connectionPolicy.asJava
     policy.isUsingMultipleWriteLocations shouldBe true
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
index 1dc4ed2..18fbf29 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
@@ -37,7 +37,7 @@ trait CosmosDBStoreBehaviorBase extends FlatSpec with 
ArtifactStoreBehaviorBase
 
   override lazy val storeAvailableCheck: Try[Any] = storeConfigTry
 
-  private lazy val config: CosmosDBConfig = storeConfig.copy(db = 
createTestDB().getId)
+  protected lazy val config: CosmosDBConfig = storeConfig.copy(db = 
createTestDB().getId)
 
   override lazy val authStore = {
     implicit val docReader: DocumentReader = WhiskDocumentReader

Reply via email to