This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d2bb75  Add support for TTL at collection level with CosmosDB (#4229)
0d2bb75 is described below

commit 0d2bb75d32a17d6b19add74173ecfb721dde5803
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Mon Feb 11 10:55:46 2019 -0800

    Add support for TTL at collection level with CosmosDB (#4229)
    
    * Add support for TTL at collection level
---
 common/scala/src/main/resources/application.conf   | 32 ++++++----
 .../database/cosmosdb/CosmosDBArtifactStore.scala  |  5 +-
 .../cosmosdb/CosmosDBArtifactStoreProvider.scala   | 13 ++--
 .../core/database/cosmosdb/CosmosDBConfig.scala    |  3 +-
 .../core/database/cosmosdb/CosmosDBSupport.scala   |  2 +
 .../database/cosmosdb/CosmosDBConfigTests.scala    |  6 +-
 .../database/cosmosdb/CosmosDBSupportTests.scala   | 70 +++++++++++++++++++++-
 7 files changed, 106 insertions(+), 25 deletions(-)

diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index 2ae1a69..857151e 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -159,9 +159,9 @@ whisk {
     # CosmosDB related configuration
     # For example:
     cosmosdb {
-    #   endpoint          =               # Endpoint URL like 
https://<account>.documents.azure.com:443/
-    #   key               =               # Access key
-    #   db                =               # Database name
+        # endpoint          =               # Endpoint URL like 
https://<account>.documents.azure.com:443/
+        # key               =               # Access key
+        # db                =               # Database name
         # Throughput configured for each collection within this db
         # This is configured only if collection is created fresh. If collection
         # already exists then existing throughput would be used
@@ -169,6 +169,11 @@ whisk {
         # Select from one of the supported
         # 
https://azure.github.io/azure-cosmosdb-java/1.0.0/com/microsoft/azure/cosmosdb/ConsistencyLevel.html
         consistency-level = "Session"
+
+        # TTL duration. By default no TTL is set unless explicitly configured
+        # ENABLING THIS VALUE MEANS YOUR DATA WILL NOT BE PERMANENTLY STORED
+        # Can only be used for `WhiskActivation` for now
+        # time-to-live      = 60 s
         connection-policy {
             max-pool-size = 1000
             # When the value of this property is true, the SDK will direct 
write operations to
@@ -191,15 +196,18 @@ whisk {
         }
 
         # Specify entity specific overrides below. By default all config 
values would be picked from top level. To override
-        # any config option for specific entity specify them below. For 
example if multiple writes need to be enabled
-        # for activations then
-    #   collections {
-    #       WhiskActivation {            # Add entity specific overrides here
-    #           connection-policy {
-    #              using-multiple-write-locations = true
-    #            }
-    #       }
-    #   }
+        # any config option for specific entity specify them below. Following 
names can be used
+        #   - WhiskAuth
+        #   - WhiskEntity
+        #   - WhiskActivation
+        # For example if multiple writes need to be enabled for activations 
then
+        # collections {
+        #   WhiskActivation {            # Add entity specific overrides here
+        #     connection-policy {
+        #        using-multiple-write-locations = true
+        #     }
+        #   }
+        # }
     }
 
     # transaction ID related configuration
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index 88ebb36..d65761b 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -62,7 +62,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
   val attachmentScheme: String = 
attachmentStore.map(_.scheme).getOrElse(cosmosScheme)
 
   protected val client: AsyncDocumentClient = clientRef.get.client
-  private val (database, collection) = initialize()
+  private[cosmosdb] val (database, collection) = initialize()
 
   private val _id = "_id"
   private val _rev = "_rev"
@@ -77,7 +77,8 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
   logging.info(
     this,
     s"Initializing CosmosDBArtifactStore for collection [$collName]. Service 
endpoint [${client.getServiceEndpoint}], " +
-      s"Read endpoint [${client.getReadEndpoint}], Write endpoint 
[${client.getWriteEndpoint}], Connection Policy 
[${client.getConnectionPolicy}]")
+      s"Read endpoint [${client.getReadEndpoint}], Write endpoint 
[${client.getWriteEndpoint}], Connection Policy 
[${client.getConnectionPolicy}], " +
+      s"Time to live [${collection.getDefaultTimeToLive} secs")
 
   //Clone the returned instance as these are mutable
   def documentCollection(): DocumentCollection = new 
DocumentCollection(collection.toJson)
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
index ab59e67..72f9f10 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
@@ -58,7 +58,7 @@ object CosmosDBArtifactStoreProvider extends 
ArtifactStoreProvider {
     docReader: DocumentReader,
     actorSystem: ActorSystem,
     logging: Logging,
-    materializer: ActorMaterializer): ArtifactStore[D] = {
+    materializer: ActorMaterializer): CosmosDBArtifactStore[D] = {
 
     makeStoreForClient(config, createReference(config).reference(), 
attachmentStore)
   }
@@ -70,7 +70,7 @@ object CosmosDBArtifactStoreProvider extends 
ArtifactStoreProvider {
     docReader: DocumentReader,
     actorSystem: ActorSystem,
     logging: Logging,
-    materializer: ActorMaterializer): ArtifactStore[D] = {
+    materializer: ActorMaterializer): CosmosDBArtifactStore[D] = {
 
     val classTag = implicitly[ClassTag[D]]
     val (dbName, handler, viewMapper) = handlerAndMapper(classTag)
@@ -100,7 +100,7 @@ object CosmosDBArtifactStoreProvider extends 
ArtifactStoreProvider {
    * This method ensures that all store instances share same client instance 
and thus the underlying connection pool.
    * Synchronization is required to ensure concurrent init of various store 
instances share same ref instance
    */
-  private def getOrCreateReference(config: CosmosDBConfig) = synchronized {
+  private def getOrCreateReference[D <: DocumentSerializer: ClassTag](config: 
CosmosDBConfig) = synchronized {
     val clientRef = clients.getOrElseUpdate(config, createReference(config))
     if (clientRef.isClosed) {
       val newRef = createReference(config)
@@ -111,7 +111,12 @@ object CosmosDBArtifactStoreProvider extends 
ArtifactStoreProvider {
     }
   }
 
-  private def createReference(config: CosmosDBConfig) =
+  private def createReference[D <: DocumentSerializer: ClassTag](config: 
CosmosDBConfig) = {
+    val clazz = implicitly[ClassTag[D]].runtimeClass
+    if (clazz != classOf[WhiskActivation]) {
+      require(config.timeToLive.isEmpty, s"'timeToLive' should not  be 
specified for ${clazz.getSimpleName}")
+    }
     new ReferenceCounted[ClientHolder](ClientHolder(config.createClient()))
+  }
 
 }
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
index 0c93917..38332b3 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
@@ -35,7 +35,8 @@ case class CosmosDBConfig(endpoint: String,
                           db: String,
                           throughput: Int,
                           consistencyLevel: ConsistencyLevel,
-                          connectionPolicy: ConnectionPolicy) {
+                          connectionPolicy: ConnectionPolicy,
+                          timeToLive: Option[Duration]) {
 
   def createClient(): AsyncDocumentClient = {
     new AsyncDocumentClient.Builder()
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala
index c534911..da9614f 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala
@@ -69,6 +69,8 @@ private[cosmosdb] trait CosmosDBSupport extends 
RxObservableImplicits with Cosmo
     defn.setId(collName)
     defn.setIndexingPolicy(viewMapper.indexingPolicy.asJava())
     defn.setPartitionKey(viewMapper.partitionKeyDefn)
+    val ttl = config.timeToLive.map(_.toSeconds.toInt).getOrElse(-1)
+    defn.setDefaultTimeToLive(ttl)
     defn
   }
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
index 5e6e1cd..09a96f8 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
@@ -66,7 +66,7 @@ class CosmosDBConfigTests extends FlatSpec with Matchers {
       | }
          """.stripMargin).withFallback(globalConfig)
     val cosmos = CosmosDBConfig(config, "WhiskAuth")
-    cosmos should matchPattern { case CosmosDBConfig("http://localhost";, 
"foo", "openwhisk", _, _, _) => }
+    cosmos should matchPattern { case CosmosDBConfig("http://localhost";, 
"foo", "openwhisk", _, _, _, _) => }
   }
 
   it should "work with extended config" in {
@@ -81,7 +81,7 @@ class CosmosDBConfigTests extends FlatSpec with Matchers {
       | }
          """.stripMargin).withFallback(globalConfig)
     val cosmos = CosmosDBConfig(config, "WhiskAuth")
-    cosmos should matchPattern { case CosmosDBConfig("http://localhost";, 
"foo", "openwhisk", _, _, _) => }
+    cosmos should matchPattern { case CosmosDBConfig("http://localhost";, 
"foo", "openwhisk", _, _, _, _) => }
 
     cosmos.connectionPolicy.maxPoolSize shouldBe 42
     val policy = cosmos.connectionPolicy.asJava
@@ -114,7 +114,7 @@ class CosmosDBConfigTests extends FlatSpec with Matchers {
       | }
          """.stripMargin).withFallback(globalConfig)
     val cosmos = CosmosDBConfig(config, "WhiskAuth")
-    cosmos should matchPattern { case CosmosDBConfig("http://localhost";, 
"foo", "openwhisk", _, _, _) => }
+    cosmos should matchPattern { case CosmosDBConfig("http://localhost";, 
"foo", "openwhisk", _, _, _, _) => }
 
     val policy = cosmos.connectionPolicy.asJava
     policy.isUsingMultipleWriteLocations shouldBe true
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
index 57f57aa..478463f 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
@@ -17,28 +17,48 @@
 
 package org.apache.openwhisk.core.database.cosmosdb
 
+import akka.stream.ActorMaterializer
 import com.microsoft.azure.cosmosdb.IndexKind.Hash
 import com.microsoft.azure.cosmosdb.DataType.String
 import com.microsoft.azure.cosmosdb.DocumentCollection
 import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
+import com.typesafe.config.ConfigFactory
+import common.{StreamLogging, WskActorSystem}
+import org.apache.openwhisk.core.entity.{
+  DocumentReader,
+  WhiskActivation,
+  WhiskDocumentReader,
+  WhiskEntity,
+  WhiskEntityJsonFormat
+}
 import org.junit.runner.RunWith
 import org.scalamock.scalatest.MockFactory
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{FlatSpec, Matchers}
 
 import scala.collection.JavaConverters._
+import scala.concurrent.duration.DurationInt
 
 @RunWith(classOf[JUnitRunner])
-class CosmosDBSupportTests extends FlatSpec with CosmosDBTestSupport with 
MockFactory with Matchers {
+class CosmosDBSupportTests
+    extends FlatSpec
+    with CosmosDBTestSupport
+    with MockFactory
+    with Matchers
+    with StreamLogging
+    with WskActorSystem {
+
+  behavior of "CosmosDB init"
 
-  behavior of "index"
+  protected implicit val materializer: ActorMaterializer = ActorMaterializer()
 
-  it should "be created and updated on init" in {
+  it should "create and update index" in {
     val testDb = createTestDB()
     val config: CosmosDBConfig = storeConfig.copy(db = testDb.getId)
 
     val indexedPaths1 = Set("/foo/?", "/bar/?")
     val (_, coll) = new CosmosTest(config, client, 
newMapper(indexedPaths1)).initialize()
+    coll.getDefaultTimeToLive shouldBe -1
     indexedPaths(coll) should contain theSameElementsAs indexedPaths1
 
     //Test if index definition is updated in code it gets updated in db also
@@ -47,6 +67,50 @@ class CosmosDBSupportTests extends FlatSpec with 
CosmosDBTestSupport with MockFa
     indexedPaths(coll2) should contain theSameElementsAs indexedPaths2
   }
 
+  it should "set ttl" in {
+    implicit val docReader: DocumentReader = WhiskDocumentReader
+    val config = ConfigFactory.parseString(s"""
+      | whisk.cosmosdb {
+      |  collections {
+      |     WhiskActivation = {
+      |        time-to-live = 60 s
+      |     }
+      |  }
+      | }
+         """.stripMargin).withFallback(ConfigFactory.load())
+
+    val cosmosDBConfig = CosmosDBConfig(config, "WhiskActivation")
+    cosmosDBConfig.timeToLive shouldBe Some(60.seconds)
+
+    val testDb = createTestDB()
+    val testConfig = cosmosDBConfig.copy(db = testDb.getId)
+    val coll = 
CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskActivation](testConfig, 
None).collection
+    coll.getDefaultTimeToLive shouldBe 60.seconds.toSeconds
+  }
+
+  it should "not set ttl for WhiskEntity" in {
+    implicit val docReader: DocumentReader = WhiskDocumentReader
+    implicit val format = WhiskEntityJsonFormat
+    val config = ConfigFactory.parseString(s"""
+      | whisk.cosmosdb {
+      |  collections {
+      |     WhiskEntity = {
+      |        time-to-live = 60 s
+      |     }
+      |  }
+      | }
+         """.stripMargin).withFallback(ConfigFactory.load())
+
+    val cosmosDBConfig = CosmosDBConfig(config, "WhiskEntity")
+    cosmosDBConfig.timeToLive shouldBe Some(60.seconds)
+
+    val testConfig = cosmosDBConfig.copy(db = "foo")
+
+    an[IllegalArgumentException] shouldBe thrownBy {
+      CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](testConfig, 
None).collection
+    }
+  }
+
   private def newMapper(paths: Set[String]) = {
     val mapper = stub[CosmosDBViewMapper]
     mapper.indexingPolicy _ when () returns newTestIndexingPolicy(paths)

Reply via email to