This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 0d2bb75 Add support for TTL at collection level with CosmosDB (#4229)
0d2bb75 is described below
commit 0d2bb75d32a17d6b19add74173ecfb721dde5803
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Mon Feb 11 10:55:46 2019 -0800
Add support for TTL at collection level with CosmosDB (#4229)
* Add support for TTL at collection level
---
common/scala/src/main/resources/application.conf | 32 ++++++----
.../database/cosmosdb/CosmosDBArtifactStore.scala | 5 +-
.../cosmosdb/CosmosDBArtifactStoreProvider.scala | 13 ++--
.../core/database/cosmosdb/CosmosDBConfig.scala | 3 +-
.../core/database/cosmosdb/CosmosDBSupport.scala | 2 +
.../database/cosmosdb/CosmosDBConfigTests.scala | 6 +-
.../database/cosmosdb/CosmosDBSupportTests.scala | 70 +++++++++++++++++++++-
7 files changed, 106 insertions(+), 25 deletions(-)
diff --git a/common/scala/src/main/resources/application.conf
b/common/scala/src/main/resources/application.conf
index 2ae1a69..857151e 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -159,9 +159,9 @@ whisk {
# CosmosDB related configuration
# For example:
cosmosdb {
- # endpoint = # Endpoint URL like
https://<account>.documents.azure.com:443/
- # key = # Access key
- # db = # Database name
+ # endpoint = # Endpoint URL like
https://<account>.documents.azure.com:443/
+ # key = # Access key
+ # db = # Database name
# Throughput configured for each collection within this db
# This is configured only if collection is created fresh. If collection
# already exists then existing throughput would be used
@@ -169,6 +169,11 @@ whisk {
# Select from one of the supported
#
https://azure.github.io/azure-cosmosdb-java/1.0.0/com/microsoft/azure/cosmosdb/ConsistencyLevel.html
consistency-level = "Session"
+
+ # TTL duration. By default no TTL is set unless explicitly configured
+ # ENABLING THIS VALUE MEANS YOUR DATA WILL NOT BE PERMANENTLY STORED
+ # Can only be used for `WhiskActivation` for now
+ # time-to-live = 60 s
connection-policy {
max-pool-size = 1000
# When the value of this property is true, the SDK will direct
write operations to
@@ -191,15 +196,18 @@ whisk {
}
# Specify entity specific overrides below. By default all config
values would be picked from top level. To override
- # any config option for specific entity specify them below. For
example if multiple writes need to be enabled
- # for activations then
- # collections {
- # WhiskActivation { # Add entity specific overrides here
- # connection-policy {
- # using-multiple-write-locations = true
- # }
- # }
- # }
+ # any config option for specific entity specify them below. Following
names can be used
+ # - WhiskAuth
+ # - WhiskEntity
+ # - WhiskActivation
+ # For example if multiple writes need to be enabled for activations
then
+ # collections {
+ # WhiskActivation { # Add entity specific overrides here
+ # connection-policy {
+ # using-multiple-write-locations = true
+ # }
+ # }
+ # }
}
# transaction ID related configuration
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index 88ebb36..d65761b 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -62,7 +62,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
val attachmentScheme: String =
attachmentStore.map(_.scheme).getOrElse(cosmosScheme)
protected val client: AsyncDocumentClient = clientRef.get.client
- private val (database, collection) = initialize()
+ private[cosmosdb] val (database, collection) = initialize()
private val _id = "_id"
private val _rev = "_rev"
@@ -77,7 +77,8 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
logging.info(
this,
s"Initializing CosmosDBArtifactStore for collection [$collName]. Service
endpoint [${client.getServiceEndpoint}], " +
- s"Read endpoint [${client.getReadEndpoint}], Write endpoint
[${client.getWriteEndpoint}], Connection Policy
[${client.getConnectionPolicy}]")
+ s"Read endpoint [${client.getReadEndpoint}], Write endpoint
[${client.getWriteEndpoint}], Connection Policy
[${client.getConnectionPolicy}], " +
+ s"Time to live [${collection.getDefaultTimeToLive} secs")
//Clone the returned instance as these are mutable
def documentCollection(): DocumentCollection = new
DocumentCollection(collection.toJson)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
index ab59e67..72f9f10 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreProvider.scala
@@ -58,7 +58,7 @@ object CosmosDBArtifactStoreProvider extends
ArtifactStoreProvider {
docReader: DocumentReader,
actorSystem: ActorSystem,
logging: Logging,
- materializer: ActorMaterializer): ArtifactStore[D] = {
+ materializer: ActorMaterializer): CosmosDBArtifactStore[D] = {
makeStoreForClient(config, createReference(config).reference(),
attachmentStore)
}
@@ -70,7 +70,7 @@ object CosmosDBArtifactStoreProvider extends
ArtifactStoreProvider {
docReader: DocumentReader,
actorSystem: ActorSystem,
logging: Logging,
- materializer: ActorMaterializer): ArtifactStore[D] = {
+ materializer: ActorMaterializer): CosmosDBArtifactStore[D] = {
val classTag = implicitly[ClassTag[D]]
val (dbName, handler, viewMapper) = handlerAndMapper(classTag)
@@ -100,7 +100,7 @@ object CosmosDBArtifactStoreProvider extends
ArtifactStoreProvider {
* This method ensures that all store instances share same client instance
and thus the underlying connection pool.
* Synchronization is required to ensure concurrent init of various store
instances share same ref instance
*/
- private def getOrCreateReference(config: CosmosDBConfig) = synchronized {
+ private def getOrCreateReference[D <: DocumentSerializer: ClassTag](config:
CosmosDBConfig) = synchronized {
val clientRef = clients.getOrElseUpdate(config, createReference(config))
if (clientRef.isClosed) {
val newRef = createReference(config)
@@ -111,7 +111,12 @@ object CosmosDBArtifactStoreProvider extends
ArtifactStoreProvider {
}
}
- private def createReference(config: CosmosDBConfig) =
+ private def createReference[D <: DocumentSerializer: ClassTag](config:
CosmosDBConfig) = {
+ val clazz = implicitly[ClassTag[D]].runtimeClass
+ if (clazz != classOf[WhiskActivation]) {
+ require(config.timeToLive.isEmpty, s"'timeToLive' should not be
specified for ${clazz.getSimpleName}")
+ }
new ReferenceCounted[ClientHolder](ClientHolder(config.createClient()))
+ }
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
index 0c93917..38332b3 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
@@ -35,7 +35,8 @@ case class CosmosDBConfig(endpoint: String,
db: String,
throughput: Int,
consistencyLevel: ConsistencyLevel,
- connectionPolicy: ConnectionPolicy) {
+ connectionPolicy: ConnectionPolicy,
+ timeToLive: Option[Duration]) {
def createClient(): AsyncDocumentClient = {
new AsyncDocumentClient.Builder()
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala
index c534911..da9614f 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupport.scala
@@ -69,6 +69,8 @@ private[cosmosdb] trait CosmosDBSupport extends
RxObservableImplicits with Cosmo
defn.setId(collName)
defn.setIndexingPolicy(viewMapper.indexingPolicy.asJava())
defn.setPartitionKey(viewMapper.partitionKeyDefn)
+ val ttl = config.timeToLive.map(_.toSeconds.toInt).getOrElse(-1)
+ defn.setDefaultTimeToLive(ttl)
defn
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
index 5e6e1cd..09a96f8 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfigTests.scala
@@ -66,7 +66,7 @@ class CosmosDBConfigTests extends FlatSpec with Matchers {
| }
""".stripMargin).withFallback(globalConfig)
val cosmos = CosmosDBConfig(config, "WhiskAuth")
- cosmos should matchPattern { case CosmosDBConfig("http://localhost",
"foo", "openwhisk", _, _, _) => }
+ cosmos should matchPattern { case CosmosDBConfig("http://localhost",
"foo", "openwhisk", _, _, _, _) => }
}
it should "work with extended config" in {
@@ -81,7 +81,7 @@ class CosmosDBConfigTests extends FlatSpec with Matchers {
| }
""".stripMargin).withFallback(globalConfig)
val cosmos = CosmosDBConfig(config, "WhiskAuth")
- cosmos should matchPattern { case CosmosDBConfig("http://localhost",
"foo", "openwhisk", _, _, _) => }
+ cosmos should matchPattern { case CosmosDBConfig("http://localhost",
"foo", "openwhisk", _, _, _, _) => }
cosmos.connectionPolicy.maxPoolSize shouldBe 42
val policy = cosmos.connectionPolicy.asJava
@@ -114,7 +114,7 @@ class CosmosDBConfigTests extends FlatSpec with Matchers {
| }
""".stripMargin).withFallback(globalConfig)
val cosmos = CosmosDBConfig(config, "WhiskAuth")
- cosmos should matchPattern { case CosmosDBConfig("http://localhost",
"foo", "openwhisk", _, _, _) => }
+ cosmos should matchPattern { case CosmosDBConfig("http://localhost",
"foo", "openwhisk", _, _, _, _) => }
val policy = cosmos.connectionPolicy.asJava
policy.isUsingMultipleWriteLocations shouldBe true
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
index 57f57aa..478463f 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSupportTests.scala
@@ -17,28 +17,48 @@
package org.apache.openwhisk.core.database.cosmosdb
+import akka.stream.ActorMaterializer
import com.microsoft.azure.cosmosdb.IndexKind.Hash
import com.microsoft.azure.cosmosdb.DataType.String
import com.microsoft.azure.cosmosdb.DocumentCollection
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
+import com.typesafe.config.ConfigFactory
+import common.{StreamLogging, WskActorSystem}
+import org.apache.openwhisk.core.entity.{
+ DocumentReader,
+ WhiskActivation,
+ WhiskDocumentReader,
+ WhiskEntity,
+ WhiskEntityJsonFormat
+}
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Matchers}
import scala.collection.JavaConverters._
+import scala.concurrent.duration.DurationInt
@RunWith(classOf[JUnitRunner])
-class CosmosDBSupportTests extends FlatSpec with CosmosDBTestSupport with
MockFactory with Matchers {
+class CosmosDBSupportTests
+ extends FlatSpec
+ with CosmosDBTestSupport
+ with MockFactory
+ with Matchers
+ with StreamLogging
+ with WskActorSystem {
+
+ behavior of "CosmosDB init"
- behavior of "index"
+ protected implicit val materializer: ActorMaterializer = ActorMaterializer()
- it should "be created and updated on init" in {
+ it should "create and update index" in {
val testDb = createTestDB()
val config: CosmosDBConfig = storeConfig.copy(db = testDb.getId)
val indexedPaths1 = Set("/foo/?", "/bar/?")
val (_, coll) = new CosmosTest(config, client,
newMapper(indexedPaths1)).initialize()
+ coll.getDefaultTimeToLive shouldBe -1
indexedPaths(coll) should contain theSameElementsAs indexedPaths1
//Test if index definition is updated in code it gets updated in db also
@@ -47,6 +67,50 @@ class CosmosDBSupportTests extends FlatSpec with
CosmosDBTestSupport with MockFa
indexedPaths(coll2) should contain theSameElementsAs indexedPaths2
}
+ it should "set ttl" in {
+ implicit val docReader: DocumentReader = WhiskDocumentReader
+ val config = ConfigFactory.parseString(s"""
+ | whisk.cosmosdb {
+ | collections {
+ | WhiskActivation = {
+ | time-to-live = 60 s
+ | }
+ | }
+ | }
+ """.stripMargin).withFallback(ConfigFactory.load())
+
+ val cosmosDBConfig = CosmosDBConfig(config, "WhiskActivation")
+ cosmosDBConfig.timeToLive shouldBe Some(60.seconds)
+
+ val testDb = createTestDB()
+ val testConfig = cosmosDBConfig.copy(db = testDb.getId)
+ val coll =
CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskActivation](testConfig,
None).collection
+ coll.getDefaultTimeToLive shouldBe 60.seconds.toSeconds
+ }
+
+ it should "not set ttl for WhiskEntity" in {
+ implicit val docReader: DocumentReader = WhiskDocumentReader
+ implicit val format = WhiskEntityJsonFormat
+ val config = ConfigFactory.parseString(s"""
+ | whisk.cosmosdb {
+ | collections {
+ | WhiskEntity = {
+ | time-to-live = 60 s
+ | }
+ | }
+ | }
+ """.stripMargin).withFallback(ConfigFactory.load())
+
+ val cosmosDBConfig = CosmosDBConfig(config, "WhiskEntity")
+ cosmosDBConfig.timeToLive shouldBe Some(60.seconds)
+
+ val testConfig = cosmosDBConfig.copy(db = "foo")
+
+ an[IllegalArgumentException] shouldBe thrownBy {
+ CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](testConfig,
None).collection
+ }
+ }
+
private def newMapper(paths: Set[String]) = {
val mapper = stub[CosmosDBViewMapper]
mapper.indexingPolicy _ when () returns newTestIndexingPolicy(paths)