This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new b904b56  CosmosDB cache invalidator refactor (#4890)
b904b56 is described below

commit b904b56cc26dc3edf31c5a3721843e3e2cb1418e
Author: tysonnorris <[email protected]>
AuthorDate: Tue Jul 21 09:44:44 2020 -0700

    CosmosDB cache invalidator refactor (#4890)
    
    * refactor cache-invalidator to be more defensive with cosmos+kafka failures
---
 .../src/main/resources/application.conf            |   2 +
 .../database/cosmosdb/cache/CacheInvalidator.scala |  82 ++++++++++----
 .../cosmosdb/cache/ChangeFeedConsumer.scala        |  98 +++++++++-------
 .../cosmosdb/cache/KafkaEventProducer.scala        |  27 +++--
 .../core/database/cosmosdb/cache/Main.scala        |  22 +++-
 .../cosmosdb/cache/CacheInvalidatorTests.scala     | 123 +++++++++++++++++++--
 6 files changed, 275 insertions(+), 79 deletions(-)

diff --git 
a/core/cosmosdb/cache-invalidator/src/main/resources/application.conf 
b/core/cosmosdb/cache-invalidator/src/main/resources/application.conf
index 3baafbb..2a698f0 100644
--- a/core/cosmosdb/cache-invalidator/src/main/resources/application.conf
+++ b/core/cosmosdb/cache-invalidator/src/main/resources/application.conf
@@ -20,5 +20,7 @@ akka.kafka.producer {
   # can be defined in this configuration section.
   kafka-clients {
     bootstrap.servers = ${?KAFKA_HOSTS}
+    //change the default producer timeout so that it will quickly fail when 
kafka topic cannot be written
+    max.block.ms = 2000
   }
 }
diff --git 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
index 6a97bcc..04472f1 100644
--- 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
+++ 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
@@ -20,40 +20,77 @@ import akka.Done
 import akka.actor.{ActorSystem, CoordinatedShutdown}
 import akka.kafka.ProducerSettings
 import akka.stream.ActorMaterializer
-import com.google.common.base.Throwables
 import com.typesafe.config.Config
 import org.apache.kafka.common.serialization.StringSerializer
 import org.apache.openwhisk.common.Logging
 import 
org.apache.openwhisk.core.database.RemoteCacheInvalidation.cacheInvalidationTopic
 
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-
-object CacheInvalidator {
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.Success
 
+class CacheInvalidator(globalConfig: Config)(implicit system: ActorSystem,
+                                             materializer: ActorMaterializer,
+                                             log: Logging) {
+  import CacheInvalidator._
   val instanceId = "cache-invalidator"
   val whisksCollection = "whisks"
+  implicit val ec: ExecutionContext = system.dispatcher
 
-  def start(
-    globalConfig: Config)(implicit system: ActorSystem, materializer: 
ActorMaterializer, log: Logging): Future[Done] = {
-    implicit val ec: ExecutionContext = system.dispatcher
-    val config = CacheInvalidatorConfig(globalConfig)
-    val producer =
-      KafkaEventProducer(
-        kafkaProducerSettings(defaultProducerConfig(globalConfig)),
-        cacheInvalidationTopic,
-        config.eventProducerConfig)
-    val observer = new WhiskChangeEventObserver(config.invalidatorConfig, 
producer)
-    val feedConsumer = new ChangeFeedConsumer(whisksCollection, config, 
observer)
-    feedConsumer.isStarted.andThen {
-      case Success(_) =>
+  val config = CacheInvalidatorConfig(globalConfig)
+  val producer =
+    KafkaEventProducer(
+      kafkaProducerSettings(defaultProducerConfig(globalConfig)),
+      cacheInvalidationTopic,
+      config.eventProducerConfig)
+  val observer = new WhiskChangeEventObserver(config.invalidatorConfig, 
producer)
+  val feedConsumer: ChangeFeedConsumer = new 
ChangeFeedConsumer(whisksCollection, config, observer)
+  val running = Promise[Done]
+
+  def start(): (Future[Done], Future[Done]) = {
+    //If there is a failure at feedConsumer.start, stop everything
+    val startFuture = feedConsumer.start
+    startFuture
+      .map { _ =>
         registerShutdownTasks(system, feedConsumer, producer)
         log.info(this, s"Started the Cache invalidator service. ClusterId 
[${config.invalidatorConfig.clusterId}]")
-      case Failure(t) =>
-        log.error(this, "Error occurred while starting the Consumer" + 
Throwables.getStackTraceAsString(t))
+      }
+      .recover {
+        case t: Throwable =>
+          log.error(this, s"Shutdown after failure to start invalidator: ${t}")
+          stop(Some(t))
+      }
+
+    //If the producer stream fails, stop everything.
+    producer
+      .getStreamFuture()
+      .map(_ => log.info(this, "Successfully completed producer"))
+      .recover {
+        case t: Throwable =>
+          log.error(this, s"Shutdown after producer failure: ${t}")
+          stop(Some(t))
+      }
+
+    (startFuture, running.future)
+  }
+  def stop(error: Option[Throwable])(implicit system: ActorSystem, ec: 
ExecutionContext, log: Logging): Future[Done] = {
+    feedConsumer
+      .close()
+      .andThen {
+        case _ =>
+          producer.close().andThen {
+            case _ =>
+              terminate(error)
+          }
+      }
+  }
+  def terminate(error: Option[Throwable]): Unit = {
+    //make sure that the tracking future is only completed once, even though 
it may be called for various types of failures
+    synchronized {
+      if (!running.isCompleted) {
+        error.map(running.failure).getOrElse(running.success(Done))
+      }
     }
   }
-
   private def registerShutdownTasks(system: ActorSystem,
                                     feedConsumer: ChangeFeedConsumer,
                                     producer: KafkaEventProducer)(implicit ec: 
ExecutionContext, log: Logging): Unit = {
@@ -68,7 +105,8 @@ object CacheInvalidator {
         }
     }
   }
-
+}
+object CacheInvalidator {
   def kafkaProducerSettings(config: Config): ProducerSettings[String, String] =
     ProducerSettings(config, new StringSerializer, new StringSerializer)
 
diff --git 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedConsumer.scala
 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedConsumer.scala
index 27afef7..0472bc5 100644
--- 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedConsumer.scala
+++ 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedConsumer.scala
@@ -48,52 +48,74 @@ class ChangeFeedConsumer(collName: String, config: 
CacheInvalidatorConfig, obser
   import ChangeFeedConsumer._
 
   log.info(this, s"Watching changes in $collName with lease managed in 
${config.feedConfig.leaseCollection}")
+  val clients = scala.collection.mutable.Map[ConnectionInfo, 
CosmosClient]().withDefault(createCosmosClient)
 
-  private val clients = scala.collection.mutable.Map[ConnectionInfo, 
CosmosClient]().withDefault(createCosmosClient)
-  private val targetContainer = getContainer(collName)
-  private val leaseContainer = getContainer(config.feedConfig.leaseCollection, 
createIfNotExist = true)
-  private val (processor, startFuture) = {
-    val clusterId = config.invalidatorConfig.clusterId
-    val prefix = clusterId.map(id => s"$id-$collName").getOrElse(collName)
-
-    val feedOpts = new ChangeFeedProcessorOptions
-    feedOpts.leasePrefix(prefix)
-    feedOpts.startFromBeginning(config.feedConfig.startFromBeginning)
-
-    val builder = ChangeFeedProcessor.Builder
-      .hostName(config.feedConfig.hostname)
-      .feedContainer(targetContainer)
-      .leaseContainer(leaseContainer)
-      .options(feedOpts)
-      .asInstanceOf[ChangeFeedProcessorBuilderImpl] //observerFactory is not 
exposed hence need to cast to impl
-
-    builder.observerFactory(() => ObserverBridge)
-    val p = builder.build()
-    (p, p.start().toFuture.toScala.map(_ => Done))
-  }
+  var processor: Option[ChangeFeedProcessor] = None
+  def start: Future[Done] = {
 
-  def isStarted: Future[Done] = startFuture
+    def getContainer(name: String, createIfNotExist: Boolean = false): 
CosmosContainer = {
+      val info = config.getCollectionInfo(name)
+      val client = clients(info)
+      val db = client.getDatabase(info.db)
+      val container = db.getContainer(name)
 
-  def close(): Future[Done] = {
-    val f = processor.stop().toFuture.toScala.map(_ => Done)
-    f.andThen {
-      case _ =>
-        clients.values.foreach(c => c.close())
-        Future.successful(Done)
+      val resp = if (createIfNotExist) {
+        db.createContainerIfNotExists(name, "/id", info.throughput)
+      } else container.read()
+
+      resp.block().container()
     }
+    try {
+      val targetContainer = getContainer(collName)
+      val leaseContainer = getContainer(config.feedConfig.leaseCollection, 
createIfNotExist = true)
+
+      val clusterId = config.invalidatorConfig.clusterId
+      val prefix = clusterId.map(id => s"$id-$collName").getOrElse(collName)
+
+      val feedOpts = new ChangeFeedProcessorOptions
+      feedOpts.leasePrefix(prefix)
+      feedOpts.startFromBeginning(config.feedConfig.startFromBeginning)
+
+      val builder = ChangeFeedProcessor.Builder
+        .hostName(config.feedConfig.hostname)
+        .feedContainer(targetContainer)
+        .leaseContainer(leaseContainer)
+        .options(feedOpts)
+        .asInstanceOf[ChangeFeedProcessorBuilderImpl] //observerFactory is not 
exposed hence need to cast to impl
+
+      builder.observerFactory(() => ObserverBridge)
+      val p = builder.build()
+
+      processor = Some(p)
+      p.start().toFuture.toScala.map(_ => Done)
+    } catch {
+      case t: Throwable => Future.failed(t)
+    }
+
   }
 
-  private def getContainer(name: String, createIfNotExist: Boolean = false): 
CosmosContainer = {
-    val info = config.getCollectionInfo(name)
-    val client = clients(info)
-    val db = client.getDatabase(info.db)
-    val container = db.getContainer(name)
+  def close(): Future[Done] = {
 
-    val resp = if (createIfNotExist) {
-      db.createContainerIfNotExists(name, "/id", info.throughput)
-    } else container.read()
+    processor
+      .map { p =>
+        // be careful about exceptions thrown during ChangeFeedProcessor.stop()
+        // e.g. calling stop() before start() completed, etc will throw 
exceptions
+        try {
+          p.stop().toFuture.toScala.map(_ => Done)
+        } catch {
+          case t: Throwable =>
+            log.warn(this, s"Failed to stop processor ${t}")
+            Future.failed(t)
+        }
+      }
+      .getOrElse(Future.successful(Done))
+      .andThen {
+        case _ =>
+          log.info(this, "Closing cosmos clients.")
+          clients.values.foreach(c => c.close())
+          Future.successful(Done)
+      }
 
-    resp.block().container()
   }
 
   private object ObserverBridge extends 
com.azure.data.cosmos.internal.changefeed.ChangeFeedObserver {
diff --git 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
index 6b15e6a..24f353e 100644
--- 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
+++ 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
@@ -22,9 +22,10 @@ import akka.actor.ActorSystem
 import akka.kafka.scaladsl.Producer
 import akka.kafka.{ProducerMessage, ProducerSettings}
 import akka.stream.scaladsl.{Keep, Sink, Source}
-import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
+import akka.stream._
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.openwhisk.common.Logging
 import org.apache.openwhisk.connector.kafka.KamonMetricsReporter
 
 import scala.collection.immutable.Seq
@@ -33,24 +34,33 @@ import scala.concurrent.{ExecutionContext, Future, Promise}
 case class KafkaEventProducer(
   settings: ProducerSettings[String, String],
   topic: String,
-  eventProducerConfig: EventProducerConfig)(implicit system: ActorSystem, 
materializer: ActorMaterializer)
+  eventProducerConfig: EventProducerConfig)(implicit system: ActorSystem, 
materializer: ActorMaterializer, log: Logging)
     extends EventProducer {
   private implicit val executionContext: ExecutionContext = system.dispatcher
 
-  private val queue = Source
-    .queue[(Seq[String], Promise[Done])](eventProducerConfig.bufferSize, 
OverflowStrategy.dropNew) //TODO Use backpressure
+  private val (queue, stream) = Source
+    .queue[(Seq[String], Promise[Done])](eventProducerConfig.bufferSize, 
OverflowStrategy.fail) //TODO Use backpressure
     .map {
       case (msgs, p) =>
+        log.info(this, s"Sending ${msgs.size} messages to kafka.")
         ProducerMessage.multi(msgs.map(newRecord), p)
     }
     .via(Producer.flexiFlow(producerSettings))
     .map {
-      case ProducerMessage.MultiResult(_, passThrough) =>
+      case ProducerMessage.MultiResult(r, passThrough) =>
+        log.info(this, s"Produced ${r.size} messages.")
         passThrough.success(Done)
-      case _ => //As we use multi mode only other modes need not be handled
+      case _ =>
+      //As we use multi mode only other modes need not be handled
     }
-    .toMat(Sink.ignore)(Keep.left)
-    .run
+    .recover {
+      case t: Throwable =>
+        //this will happen in case of shutdown while items are still queued, 
i.e. if producer cannot connect
+        throw (t)
+    }
+    .toMat(Sink.ignore)(Keep.both)
+    .run()
+  def getStreamFuture() = stream
 
   override def send(msg: Seq[String]): Future[Done] = {
     val promise = Promise[Done]
@@ -63,6 +73,7 @@ case class KafkaEventProducer(
   }
 
   def close(): Future[Done] = {
+    log.info(this, "Closing kafka producer.")
     queue.complete()
     queue.watchCompletion()
   }
diff --git 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
index 578c724..62d58a1 100644
--- 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
+++ 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
@@ -23,17 +23,33 @@ import kamon.Kamon
 import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging}
 import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
 
+import scala.concurrent.ExecutionContext
+
 object Main {
   def main(args: Array[String]): Unit = {
     implicit val system: ActorSystem = 
ActorSystem("cache-invalidator-actor-system")
     implicit val materializer: ActorMaterializer = ActorMaterializer()
     implicit val log: Logging = new 
AkkaLogging(akka.event.Logging.getLogger(system, this))
-
+    implicit val ec: ExecutionContext = system.dispatcher
     ConfigMXBean.register()
     Kamon.init()
     val port = 
CacheInvalidatorConfig(system.settings.config).invalidatorConfig.port
     BasicHttpService.startHttpService(new BasicRasService {}.route, port, None)
-    CacheInvalidator.start(system.settings.config)
-    log.info(this, s"Started the server at http://localhost:$port";)
+    val (start, finish) = new CacheInvalidator(system.settings.config).start()
+    start
+      .map(_ => log.info(this, s"Started the server at 
http://localhost:$port";))
+    finish
+      .andThen {
+        case _ =>
+          Kamon.stopModules().andThen {
+            case _ =>
+              system.terminate().andThen {
+                case _ =>
+                  //it is possible that the cosmos sdk reactor system does not 
cleanly shut down, so we will explicitly terminate jvm here.
+                  log.info(this, "Exiting")
+                  sys.exit(0)
+              }
+          }
+      }
   }
 }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
index 1d83e33..97c6c45 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
@@ -16,15 +16,17 @@
  */
 
 package org.apache.openwhisk.core.database.cosmosdb.cache
+import java.net.UnknownHostException
+
 import akka.Done
 import akka.actor.CoordinatedShutdown
 import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec}
 import akka.stream.ActorMaterializer
 import com.typesafe.config.ConfigFactory
-import common.StreamLogging
 import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
+import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.serialization.StringDeserializer
-import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.common.{AkkaLogging, TransactionId}
 import org.apache.openwhisk.core.database.{CacheInvalidationMessage, 
RemoteCacheInvalidation}
 import 
org.apache.openwhisk.core.database.cosmosdb.{CosmosDBArtifactStoreProvider, 
CosmosDBTestSupport}
 import org.apache.openwhisk.core.entity.{
@@ -52,10 +54,10 @@ class CacheInvalidatorTests
     with CosmosDBTestSupport
     with Matchers
     with ScalaFutures
-    with TryValues
-    with StreamLogging {
+    with TryValues {
 
   private implicit val materializer: ActorMaterializer = ActorMaterializer()
+  private implicit val logging = new AkkaLogging(system.log)
   implicit override val patienceConfig: PatienceConfig = 
PatienceConfig(timeout = 300.seconds)
 
   override def createKafkaConfig: EmbeddedKafkaConfig = 
EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)
@@ -81,7 +83,9 @@ class CacheInvalidatorTests
     val pkg = WhiskPackage(EntityPath("cacheInvalidationTest"), 
EntityName(randomString()))
 
     //Start cache invalidator after the db for whisks is created
-    startCacheInvalidator()
+    val cacheInvalidator = startCacheInvalidator()
+    val (start, finish) = cacheInvalidator.start()
+    start.futureValue shouldBe Done
     log.info("Cache Invalidator service started")
 
     //Store stuff in db
@@ -97,11 +101,60 @@ class CacheInvalidatorTests
     CacheInvalidationMessage.parse(msgs.head).get.key.mainId shouldBe 
pkg.docid.asString
 
     store.del(info).futureValue
+    cacheInvalidator.stop(None)
+    finish.futureValue shouldBe Done
+  }
+
+  it should "exit if there is a missing kafka broker config" in {
+    implicit val tid = TransactionId.testing
+    implicit val docReader: DocumentReader = WhiskDocumentReader
+    implicit val format = WhiskEntityJsonFormat
+    dbName = createTestDB().getId
+    val dbConfig = storeConfig.copy(db = dbName)
+    val store = 
CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](dbConfig, None)
+    val pkg = WhiskPackage(EntityPath("cacheInvalidationTest"), 
EntityName(randomString()))
+
+    //Start cache invalidator after the db for whisks is created
+    val cacheInvalidator = startCacheInvalidatorWithoutKafka()
+    val (start, finish) = cacheInvalidator.start()
+    start.futureValue shouldBe Done
+    log.info("Cache Invalidator service started")
+    //when kafka config is missing, we expect KafkaException from producer 
immediately (although stopping feed processor takes some time)
+    finish.failed.futureValue shouldBe an[KafkaException]
+  }
+  it should "exit if kafka is not consuming" in {
+    implicit val tid = TransactionId.testing
+    implicit val docReader: DocumentReader = WhiskDocumentReader
+    implicit val format = WhiskEntityJsonFormat
+    dbName = createTestDB().getId
+    val dbConfig = storeConfig.copy(db = dbName)
+    val store = 
CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](dbConfig, None)
+    val pkg = WhiskPackage(EntityPath("cacheInvalidationTest"), 
EntityName(randomString()))
+
+    //Start cache invalidator with a bogus kafka broker after the db for 
whisks is created
+    val cacheInvalidator = startCacheInvalidatorWithInvalidKafka()
+    val (start, finish) = cacheInvalidator.start()
+    start.futureValue shouldBe Done
+    log.info("Cache Invalidator service started")
+
+    //Store stuff in db
+    val info = store.put(pkg).futureValue
+    log.info(s"Added document ${info.id}")
+    //when we cannot connect to kafka, we expect KafkaException from producer 
after timeout
+    finish.failed.futureValue shouldBe an[KafkaException]
+  }
+
+  it should "exit if there is a bad db config" in {
+    //Start cache invalidator after the db for whisks is created
+    val cacheInvalidator = startCacheInvalidatorWithoutCosmos()
+    val (start, finish) = cacheInvalidator.start()
+    //when db config is broken, we expect 
reactor.core.Exceptions$ReactiveException (a non-public RuntimeException)
+    start.failed.futureValue.getCause shouldBe an[UnknownHostException]
   }
 
   private def randomString() = Random.alphanumeric.take(5).mkString
 
-  private def startCacheInvalidator() = {
+  private def startCacheInvalidator(): CacheInvalidator = {
     val tsconfig = ConfigFactory.parseString(s"""
       |akka.kafka.producer {
       |  kafka-clients {
@@ -117,7 +170,61 @@ class CacheInvalidatorTests
       |  }
       |}
       """.stripMargin).withFallback(ConfigFactory.load())
-    CacheInvalidator.start(tsconfig).futureValue shouldBe Done
+    new CacheInvalidator(tsconfig)
+  }
+  private def startCacheInvalidatorWithoutKafka(): CacheInvalidator = {
+    val tsconfig = ConfigFactory.parseString(s"""
+      |akka.kafka.producer {
+      |  kafka-clients {
+      |    #this config is missing
+      |  }
+      |}
+      |whisk {
+      |  cache-invalidator {
+      |    cosmosdb {
+      |      db = "$dbName"
+      |      start-from-beginning  = true
+      |    }
+      |  }
+      |}
+      """.stripMargin).withFallback(ConfigFactory.load())
+    new CacheInvalidator(tsconfig)
+  }
+  private def startCacheInvalidatorWithInvalidKafka(): CacheInvalidator = {
+    val tsconfig = ConfigFactory.parseString(s"""
+      |akka.kafka.producer {
+      |  kafka-clients {
+      |    bootstrap.servers = "localhost:9092"
+      |  }
+      |}
+      |whisk {
+      |  cache-invalidator {
+      |    cosmosdb {
+      |      db = "$dbName"
+      |      start-from-beginning  = true
+      |    }
+      |  }
+      |}
+      """.stripMargin).withFallback(ConfigFactory.load())
+    new CacheInvalidator(tsconfig)
+  }
+  private def startCacheInvalidatorWithoutCosmos(): CacheInvalidator = {
+    val tsconfig = ConfigFactory.parseString(s"""
+      |akka.kafka.producer {
+      |  kafka-clients {
+      |    bootstrap.servers = "$server"
+      |  }
+      |}
+      |whisk {
+      |  cache-invalidator {
+      |    cosmosdb {
+      |      db = "$dbName"
+      |      endpoint = 
"https://BADENDPOINT-nobody-home.documents.azure.com:443/";
+      |      start-from-beginning  = true
+      |    }
+      |  }
+      |}
+      """.stripMargin).withFallback(ConfigFactory.load())
+    new CacheInvalidator(tsconfig)
   }
-
 }

Reply via email to