This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new b904b56 CosmosDB cache invalidator refactor (#4890)
b904b56 is described below
commit b904b56cc26dc3edf31c5a3721843e3e2cb1418e
Author: tysonnorris <[email protected]>
AuthorDate: Tue Jul 21 09:44:44 2020 -0700
CosmosDB cache invalidator refactor (#4890)
* refactor cache-invalidator to be more defensive with cosmos+kafka failures
---
.../src/main/resources/application.conf | 2 +
.../database/cosmosdb/cache/CacheInvalidator.scala | 82 ++++++++++----
.../cosmosdb/cache/ChangeFeedConsumer.scala | 98 +++++++++-------
.../cosmosdb/cache/KafkaEventProducer.scala | 27 +++--
.../core/database/cosmosdb/cache/Main.scala | 22 +++-
.../cosmosdb/cache/CacheInvalidatorTests.scala | 123 +++++++++++++++++++--
6 files changed, 275 insertions(+), 79 deletions(-)
diff --git
a/core/cosmosdb/cache-invalidator/src/main/resources/application.conf
b/core/cosmosdb/cache-invalidator/src/main/resources/application.conf
index 3baafbb..2a698f0 100644
--- a/core/cosmosdb/cache-invalidator/src/main/resources/application.conf
+++ b/core/cosmosdb/cache-invalidator/src/main/resources/application.conf
@@ -20,5 +20,7 @@ akka.kafka.producer {
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers = ${?KAFKA_HOSTS}
+ //change the default producer timeout so that it will quickly fail when
kafka topic cannot be written
+ max.block.ms = 2000
}
}
diff --git
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
index 6a97bcc..04472f1 100644
---
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
+++
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
@@ -20,40 +20,77 @@ import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.kafka.ProducerSettings
import akka.stream.ActorMaterializer
-import com.google.common.base.Throwables
import com.typesafe.config.Config
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.openwhisk.common.Logging
import
org.apache.openwhisk.core.database.RemoteCacheInvalidation.cacheInvalidationTopic
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-
-object CacheInvalidator {
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.Success
+class CacheInvalidator(globalConfig: Config)(implicit system: ActorSystem,
+ materializer: ActorMaterializer,
+ log: Logging) {
+ import CacheInvalidator._
val instanceId = "cache-invalidator"
val whisksCollection = "whisks"
+ implicit val ec: ExecutionContext = system.dispatcher
- def start(
- globalConfig: Config)(implicit system: ActorSystem, materializer:
ActorMaterializer, log: Logging): Future[Done] = {
- implicit val ec: ExecutionContext = system.dispatcher
- val config = CacheInvalidatorConfig(globalConfig)
- val producer =
- KafkaEventProducer(
- kafkaProducerSettings(defaultProducerConfig(globalConfig)),
- cacheInvalidationTopic,
- config.eventProducerConfig)
- val observer = new WhiskChangeEventObserver(config.invalidatorConfig,
producer)
- val feedConsumer = new ChangeFeedConsumer(whisksCollection, config,
observer)
- feedConsumer.isStarted.andThen {
- case Success(_) =>
+ val config = CacheInvalidatorConfig(globalConfig)
+ val producer =
+ KafkaEventProducer(
+ kafkaProducerSettings(defaultProducerConfig(globalConfig)),
+ cacheInvalidationTopic,
+ config.eventProducerConfig)
+ val observer = new WhiskChangeEventObserver(config.invalidatorConfig,
producer)
+ val feedConsumer: ChangeFeedConsumer = new
ChangeFeedConsumer(whisksCollection, config, observer)
+ val running = Promise[Done]
+
+ def start(): (Future[Done], Future[Done]) = {
+ //If there is a failure at feedConsumer.start, stop everything
+ val startFuture = feedConsumer.start
+ startFuture
+ .map { _ =>
registerShutdownTasks(system, feedConsumer, producer)
log.info(this, s"Started the Cache invalidator service. ClusterId
[${config.invalidatorConfig.clusterId}]")
- case Failure(t) =>
- log.error(this, "Error occurred while starting the Consumer" +
Throwables.getStackTraceAsString(t))
+ }
+ .recover {
+ case t: Throwable =>
+ log.error(this, s"Shutdown after failure to start invalidator: ${t}")
+ stop(Some(t))
+ }
+
+ //If the producer stream fails, stop everything.
+ producer
+ .getStreamFuture()
+ .map(_ => log.info(this, "Successfully completed producer"))
+ .recover {
+ case t: Throwable =>
+ log.error(this, s"Shutdown after producer failure: ${t}")
+ stop(Some(t))
+ }
+
+ (startFuture, running.future)
+ }
+ def stop(error: Option[Throwable])(implicit system: ActorSystem, ec:
ExecutionContext, log: Logging): Future[Done] = {
+ feedConsumer
+ .close()
+ .andThen {
+ case _ =>
+ producer.close().andThen {
+ case _ =>
+ terminate(error)
+ }
+ }
+ }
+ def terminate(error: Option[Throwable]): Unit = {
+ //make sure that the tracking future is only completed once, even though
it may be called for various types of failures
+ synchronized {
+ if (!running.isCompleted) {
+ error.map(running.failure).getOrElse(running.success(Done))
+ }
}
}
-
private def registerShutdownTasks(system: ActorSystem,
feedConsumer: ChangeFeedConsumer,
producer: KafkaEventProducer)(implicit ec:
ExecutionContext, log: Logging): Unit = {
@@ -68,7 +105,8 @@ object CacheInvalidator {
}
}
}
-
+}
+object CacheInvalidator {
def kafkaProducerSettings(config: Config): ProducerSettings[String, String] =
ProducerSettings(config, new StringSerializer, new StringSerializer)
diff --git
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedConsumer.scala
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedConsumer.scala
index 27afef7..0472bc5 100644
---
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedConsumer.scala
+++
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedConsumer.scala
@@ -48,52 +48,74 @@ class ChangeFeedConsumer(collName: String, config:
CacheInvalidatorConfig, obser
import ChangeFeedConsumer._
log.info(this, s"Watching changes in $collName with lease managed in
${config.feedConfig.leaseCollection}")
+ val clients = scala.collection.mutable.Map[ConnectionInfo,
CosmosClient]().withDefault(createCosmosClient)
- private val clients = scala.collection.mutable.Map[ConnectionInfo,
CosmosClient]().withDefault(createCosmosClient)
- private val targetContainer = getContainer(collName)
- private val leaseContainer = getContainer(config.feedConfig.leaseCollection,
createIfNotExist = true)
- private val (processor, startFuture) = {
- val clusterId = config.invalidatorConfig.clusterId
- val prefix = clusterId.map(id => s"$id-$collName").getOrElse(collName)
-
- val feedOpts = new ChangeFeedProcessorOptions
- feedOpts.leasePrefix(prefix)
- feedOpts.startFromBeginning(config.feedConfig.startFromBeginning)
-
- val builder = ChangeFeedProcessor.Builder
- .hostName(config.feedConfig.hostname)
- .feedContainer(targetContainer)
- .leaseContainer(leaseContainer)
- .options(feedOpts)
- .asInstanceOf[ChangeFeedProcessorBuilderImpl] //observerFactory is not
exposed hence need to cast to impl
-
- builder.observerFactory(() => ObserverBridge)
- val p = builder.build()
- (p, p.start().toFuture.toScala.map(_ => Done))
- }
+ var processor: Option[ChangeFeedProcessor] = None
+ def start: Future[Done] = {
- def isStarted: Future[Done] = startFuture
+ def getContainer(name: String, createIfNotExist: Boolean = false):
CosmosContainer = {
+ val info = config.getCollectionInfo(name)
+ val client = clients(info)
+ val db = client.getDatabase(info.db)
+ val container = db.getContainer(name)
- def close(): Future[Done] = {
- val f = processor.stop().toFuture.toScala.map(_ => Done)
- f.andThen {
- case _ =>
- clients.values.foreach(c => c.close())
- Future.successful(Done)
+ val resp = if (createIfNotExist) {
+ db.createContainerIfNotExists(name, "/id", info.throughput)
+ } else container.read()
+
+ resp.block().container()
}
+ try {
+ val targetContainer = getContainer(collName)
+ val leaseContainer = getContainer(config.feedConfig.leaseCollection,
createIfNotExist = true)
+
+ val clusterId = config.invalidatorConfig.clusterId
+ val prefix = clusterId.map(id => s"$id-$collName").getOrElse(collName)
+
+ val feedOpts = new ChangeFeedProcessorOptions
+ feedOpts.leasePrefix(prefix)
+ feedOpts.startFromBeginning(config.feedConfig.startFromBeginning)
+
+ val builder = ChangeFeedProcessor.Builder
+ .hostName(config.feedConfig.hostname)
+ .feedContainer(targetContainer)
+ .leaseContainer(leaseContainer)
+ .options(feedOpts)
+ .asInstanceOf[ChangeFeedProcessorBuilderImpl] //observerFactory is not
exposed hence need to cast to impl
+
+ builder.observerFactory(() => ObserverBridge)
+ val p = builder.build()
+
+ processor = Some(p)
+ p.start().toFuture.toScala.map(_ => Done)
+ } catch {
+ case t: Throwable => Future.failed(t)
+ }
+
}
- private def getContainer(name: String, createIfNotExist: Boolean = false):
CosmosContainer = {
- val info = config.getCollectionInfo(name)
- val client = clients(info)
- val db = client.getDatabase(info.db)
- val container = db.getContainer(name)
+ def close(): Future[Done] = {
- val resp = if (createIfNotExist) {
- db.createContainerIfNotExists(name, "/id", info.throughput)
- } else container.read()
+ processor
+ .map { p =>
+ // be careful about exceptions thrown during ChangeFeedProcessor.stop()
+ // e.g. calling stop() before start() completed, etc will throw
exceptions
+ try {
+ p.stop().toFuture.toScala.map(_ => Done)
+ } catch {
+ case t: Throwable =>
+ log.warn(this, s"Failed to stop processor ${t}")
+ Future.failed(t)
+ }
+ }
+ .getOrElse(Future.successful(Done))
+ .andThen {
+ case _ =>
+ log.info(this, "Closing cosmos clients.")
+ clients.values.foreach(c => c.close())
+ Future.successful(Done)
+ }
- resp.block().container()
}
private object ObserverBridge extends
com.azure.data.cosmos.internal.changefeed.ChangeFeedObserver {
diff --git
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
index 6b15e6a..24f353e 100644
---
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
+++
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
@@ -22,9 +22,10 @@ import akka.actor.ActorSystem
import akka.kafka.scaladsl.Producer
import akka.kafka.{ProducerMessage, ProducerSettings}
import akka.stream.scaladsl.{Keep, Sink, Source}
-import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
+import akka.stream._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.connector.kafka.KamonMetricsReporter
import scala.collection.immutable.Seq
@@ -33,24 +34,33 @@ import scala.concurrent.{ExecutionContext, Future, Promise}
case class KafkaEventProducer(
settings: ProducerSettings[String, String],
topic: String,
- eventProducerConfig: EventProducerConfig)(implicit system: ActorSystem,
materializer: ActorMaterializer)
+ eventProducerConfig: EventProducerConfig)(implicit system: ActorSystem,
materializer: ActorMaterializer, log: Logging)
extends EventProducer {
private implicit val executionContext: ExecutionContext = system.dispatcher
- private val queue = Source
- .queue[(Seq[String], Promise[Done])](eventProducerConfig.bufferSize,
OverflowStrategy.dropNew) //TODO Use backpressure
+ private val (queue, stream) = Source
+ .queue[(Seq[String], Promise[Done])](eventProducerConfig.bufferSize,
OverflowStrategy.fail) //TODO Use backpressure
.map {
case (msgs, p) =>
+ log.info(this, s"Sending ${msgs.size} messages to kafka.")
ProducerMessage.multi(msgs.map(newRecord), p)
}
.via(Producer.flexiFlow(producerSettings))
.map {
- case ProducerMessage.MultiResult(_, passThrough) =>
+ case ProducerMessage.MultiResult(r, passThrough) =>
+ log.info(this, s"Produced ${r.size} messages.")
passThrough.success(Done)
- case _ => //As we use multi mode only other modes need not be handled
+ case _ =>
+ //As we use multi mode only other modes need not be handled
}
- .toMat(Sink.ignore)(Keep.left)
- .run
+ .recover {
+ case t: Throwable =>
+ //this will happen in case of shutdown while items are still queued,
i.e. if producer cannot connect
+ throw (t)
+ }
+ .toMat(Sink.ignore)(Keep.both)
+ .run()
+ def getStreamFuture() = stream
override def send(msg: Seq[String]): Future[Done] = {
val promise = Promise[Done]
@@ -63,6 +73,7 @@ case class KafkaEventProducer(
}
def close(): Future[Done] = {
+ log.info(this, "Closing kafka producer.")
queue.complete()
queue.watchCompletion()
}
diff --git
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
index 578c724..62d58a1 100644
---
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
+++
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
@@ -23,17 +23,33 @@ import kamon.Kamon
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging}
import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
+import scala.concurrent.ExecutionContext
+
object Main {
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem =
ActorSystem("cache-invalidator-actor-system")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val log: Logging = new
AkkaLogging(akka.event.Logging.getLogger(system, this))
-
+ implicit val ec: ExecutionContext = system.dispatcher
ConfigMXBean.register()
Kamon.init()
val port =
CacheInvalidatorConfig(system.settings.config).invalidatorConfig.port
BasicHttpService.startHttpService(new BasicRasService {}.route, port, None)
- CacheInvalidator.start(system.settings.config)
- log.info(this, s"Started the server at http://localhost:$port")
+ val (start, finish) = new CacheInvalidator(system.settings.config).start()
+ start
+ .map(_ => log.info(this, s"Started the server at
http://localhost:$port"))
+ finish
+ .andThen {
+ case _ =>
+ Kamon.stopModules().andThen {
+ case _ =>
+ system.terminate().andThen {
+ case _ =>
+ //it is possible that the cosmos sdk reactor system does not
cleanly shut down, so we will explicitly terminate jvm here.
+ log.info(this, "Exiting")
+ sys.exit(0)
+ }
+ }
+ }
}
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
index 1d83e33..97c6c45 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
@@ -16,15 +16,17 @@
*/
package org.apache.openwhisk.core.database.cosmosdb.cache
+import java.net.UnknownHostException
+
import akka.Done
import akka.actor.CoordinatedShutdown
import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec}
import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigFactory
-import common.StreamLogging
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
+import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.serialization.StringDeserializer
-import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.common.{AkkaLogging, TransactionId}
import org.apache.openwhisk.core.database.{CacheInvalidationMessage,
RemoteCacheInvalidation}
import
org.apache.openwhisk.core.database.cosmosdb.{CosmosDBArtifactStoreProvider,
CosmosDBTestSupport}
import org.apache.openwhisk.core.entity.{
@@ -52,10 +54,10 @@ class CacheInvalidatorTests
with CosmosDBTestSupport
with Matchers
with ScalaFutures
- with TryValues
- with StreamLogging {
+ with TryValues {
private implicit val materializer: ActorMaterializer = ActorMaterializer()
+ private implicit val logging = new AkkaLogging(system.log)
implicit override val patienceConfig: PatienceConfig =
PatienceConfig(timeout = 300.seconds)
override def createKafkaConfig: EmbeddedKafkaConfig =
EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)
@@ -81,7 +83,9 @@ class CacheInvalidatorTests
val pkg = WhiskPackage(EntityPath("cacheInvalidationTest"),
EntityName(randomString()))
//Start cache invalidator after the db for whisks is created
- startCacheInvalidator()
+ val cacheInvalidator = startCacheInvalidator()
+ val (start, finish) = cacheInvalidator.start()
+ start.futureValue shouldBe Done
log.info("Cache Invalidator service started")
//Store stuff in db
@@ -97,11 +101,60 @@ class CacheInvalidatorTests
CacheInvalidationMessage.parse(msgs.head).get.key.mainId shouldBe
pkg.docid.asString
store.del(info).futureValue
+ cacheInvalidator.stop(None)
+ finish.futureValue shouldBe Done
+ }
+
+ it should "exit if there is a missing kafka broker config" in {
+ implicit val tid = TransactionId.testing
+ implicit val docReader: DocumentReader = WhiskDocumentReader
+ implicit val format = WhiskEntityJsonFormat
+ dbName = createTestDB().getId
+ val dbConfig = storeConfig.copy(db = dbName)
+ val store =
CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](dbConfig, None)
+ val pkg = WhiskPackage(EntityPath("cacheInvalidationTest"),
EntityName(randomString()))
+
+ //Start cache invalidator after the db for whisks is created
+ val cacheInvalidator = startCacheInvalidatorWithoutKafka()
+ val (start, finish) = cacheInvalidator.start()
+ start.futureValue shouldBe Done
+ log.info("Cache Invalidator service started")
+ //when kafka config is missing, we expect KafkaException from producer
immediately (although stopping feed processor takes some time)
+ finish.failed.futureValue shouldBe an[KafkaException]
+ }
+ it should "exit if kafka is not consuming" in {
+ implicit val tid = TransactionId.testing
+ implicit val docReader: DocumentReader = WhiskDocumentReader
+ implicit val format = WhiskEntityJsonFormat
+ dbName = createTestDB().getId
+ val dbConfig = storeConfig.copy(db = dbName)
+ val store =
CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](dbConfig, None)
+ val pkg = WhiskPackage(EntityPath("cacheInvalidationTest"),
EntityName(randomString()))
+
+ //Start cache invalidator with a bogus kafka broker after the db for
whisks is created
+ val cacheInvalidator = startCacheInvalidatorWithInvalidKafka()
+ val (start, finish) = cacheInvalidator.start()
+ start.futureValue shouldBe Done
+ log.info("Cache Invalidator service started")
+
+ //Store stuff in db
+ val info = store.put(pkg).futureValue
+ log.info(s"Added document ${info.id}")
+ //when we cannot connect to kafka, we expect KafkaException from producer
after timeout
+ finish.failed.futureValue shouldBe an[KafkaException]
+ }
+
+ it should "exit if there is a bad db config" in {
+ //Start cache invalidator after the db for whisks is created
+ val cacheInvalidator = startCacheInvalidatorWithoutCosmos()
+ val (start, finish) = cacheInvalidator.start()
+ //when db config is broken, we expect
reactor.core.Exceptions$ReactiveException (a non-public RuntimeException)
+ start.failed.futureValue.getCause shouldBe an[UnknownHostException]
}
private def randomString() = Random.alphanumeric.take(5).mkString
- private def startCacheInvalidator() = {
+ private def startCacheInvalidator(): CacheInvalidator = {
val tsconfig = ConfigFactory.parseString(s"""
|akka.kafka.producer {
| kafka-clients {
@@ -117,7 +170,61 @@ class CacheInvalidatorTests
| }
|}
""".stripMargin).withFallback(ConfigFactory.load())
- CacheInvalidator.start(tsconfig).futureValue shouldBe Done
+ new CacheInvalidator(tsconfig)
+ }
+ private def startCacheInvalidatorWithoutKafka(): CacheInvalidator = {
+ val tsconfig = ConfigFactory.parseString(s"""
+ |akka.kafka.producer {
+ | kafka-clients {
+ | #this config is missing
+ | }
+ |}
+ |whisk {
+ | cache-invalidator {
+ | cosmosdb {
+ | db = "$dbName"
+ | start-from-beginning = true
+ | }
+ | }
+ |}
+ """.stripMargin).withFallback(ConfigFactory.load())
+ new CacheInvalidator(tsconfig)
+ }
+ private def startCacheInvalidatorWithInvalidKafka(): CacheInvalidator = {
+ val tsconfig = ConfigFactory.parseString(s"""
+ |akka.kafka.producer {
+ | kafka-clients {
+ | bootstrap.servers = "localhost:9092"
+ | }
+ |}
+ |whisk {
+ | cache-invalidator {
+ | cosmosdb {
+ | db = "$dbName"
+ | start-from-beginning = true
+ | }
+ | }
+ |}
+ """.stripMargin).withFallback(ConfigFactory.load())
+ new CacheInvalidator(tsconfig)
+ }
+ private def startCacheInvalidatorWithoutCosmos(): CacheInvalidator = {
+ val tsconfig = ConfigFactory.parseString(s"""
+ |akka.kafka.producer {
+ | kafka-clients {
+ | bootstrap.servers = "$server"
+ | }
+ |}
+ |whisk {
+ | cache-invalidator {
+ | cosmosdb {
+ | db = "$dbName"
+ | endpoint =
"https://BADENDPOINT-nobody-home.documents.azure.com:443/"
+ | start-from-beginning = true
+ | }
+ | }
+ |}
+ """.stripMargin).withFallback(ConfigFactory.load())
+ new CacheInvalidator(tsconfig)
}
-
}