This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new f69ecb7  Switch to Cosmos SDK v3 for Cache Invalidator Service (#4677)
f69ecb7 is described below

commit f69ecb78eafde2cc8f55ba38570f9bab92cc077a
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Tue Oct 22 14:19:12 2019 +0530

    Switch to Cosmos SDK v3 for Cache Invalidator Service (#4677)
    
    * Switch to Cosmos SDK v3
    * Remove logging related logic as v3 uses slf4j and less chatty in logs
    * Register Kamon metric reporter with Kafka producer
    * Log the server url upon startup
---
 .../core/database/RemoteCacheInvalidation.scala    |  11 +-
 core/cosmosdb/cache-invalidator/build.gradle       |   5 +-
 .../src/main/resources/reference.conf              |  22 +++-
 .../src/main/resources/whisk-logback.xml           |  27 -----
 .../database/cosmosdb/cache/CacheInvalidator.scala |  47 ++++----
 .../cosmosdb/cache/CacheInvalidatorConfig.scala    |  34 ++----
 .../cosmosdb/cache/ChangeFeedConsumer.scala        | 120 +++++++++++++++++++++
 .../cosmosdb/cache/ChangeFeedListener.scala        |  77 -------------
 .../cosmosdb/cache/KafkaEventProducer.scala        |   7 +-
 .../core/database/cosmosdb/cache/Main.scala        |   6 +-
 .../cosmosdb/cache/WhiskChangeEventObserver.scala  |  41 ++++---
 .../cosmosdb/cache/CacheInvalidatorTests.scala     |  20 ++--
 .../cache/WhiskChangeEventObserverTests.scala      |  14 +--
 tools/jenkins/apache/dockerhub.groovy              |   2 +-
 14 files changed, 235 insertions(+), 198 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala
index 9fca79e..62d06c8 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala
@@ -54,19 +54,18 @@ object CacheInvalidationMessage extends DefaultJsonProtocol 
{
 class RemoteCacheInvalidation(config: WhiskConfig, component: String, 
instance: ControllerInstanceId)(
   implicit logging: Logging,
   as: ActorSystem) {
-
+  import RemoteCacheInvalidation._
   implicit private val ec = 
as.dispatchers.lookup("dispatchers.kafka-dispatcher")
 
-  private val topic = "cacheInvalidation"
   private val instanceId = s"$component${instance.asString}"
 
   private val msgProvider = SpiLoader.get[MessagingProvider]
   private val cacheInvalidationConsumer =
-    msgProvider.getConsumer(config, s"$topic$instanceId", topic, maxPeek = 128)
+    msgProvider.getConsumer(config, s"$cacheInvalidationTopic$instanceId", 
cacheInvalidationTopic, maxPeek = 128)
   private val cacheInvalidationProducer = msgProvider.getProducer(config)
 
   def notifyOtherInstancesAboutInvalidation(key: CacheKey): Future[Unit] = {
-    cacheInvalidationProducer.send(topic, CacheInvalidationMessage(key, 
instanceId)).map(_ => Unit)
+    cacheInvalidationProducer.send(cacheInvalidationTopic, 
CacheInvalidationMessage(key, instanceId)).map(_ => Unit)
   }
 
   private val invalidationFeed = as.actorOf(Props {
@@ -100,3 +99,7 @@ class RemoteCacheInvalidation(config: WhiskConfig, 
component: String, instance:
     invalidationFeed ! MessageFeed.Processed
   }
 }
+
+object RemoteCacheInvalidation {
+  val cacheInvalidationTopic = "cacheInvalidation"
+}
diff --git a/core/cosmosdb/cache-invalidator/build.gradle 
b/core/cosmosdb/cache-invalidator/build.gradle
index 0da319c..74356fc 100644
--- a/core/cosmosdb/cache-invalidator/build.gradle
+++ b/core/cosmosdb/cache-invalidator/build.gradle
@@ -34,10 +34,7 @@ repositories {
 dependencies {
     compile "org.scala-lang:scala-library:${gradle.scala.version}"
     compile project(':common:scala')
-    compile ("com.microsoft.azure:azure-cosmos-changefeedprocessor:0.9.2") {
-        exclude group: 'junit'
-        exclude group: 'commons-logging'
-    }
+    compile "com.microsoft.azure:azure-cosmos:3.3.0"
     compile 
"com.typesafe.akka:akka-stream-kafka_2.12:${gradle.akka_kafka.version}"
 }
 
diff --git a/core/cosmosdb/cache-invalidator/src/main/resources/reference.conf 
b/core/cosmosdb/cache-invalidator/src/main/resources/reference.conf
index d31607e..e3a64fe 100644
--- a/core/cosmosdb/cache-invalidator/src/main/resources/reference.conf
+++ b/core/cosmosdb/cache-invalidator/src/main/resources/reference.conf
@@ -25,6 +25,17 @@ whisk {
       # Database name
       db                  = ${?COSMOSDB_NAME}
 
+      # ConnectionMode. Can be one of
+      # - GATEWAY
+      # - DIRECT
+      connection-mode     = "GATEWAY"
+
+      # Consistency level used for DB operations
+      consistency-level   = "SESSION"
+
+      # Default throughput used while creating a new lease collection
+      throughput        = 400
+
       # Name of collection in which lease related data is stored
       lease-collection    = "cache-invalidator-lease"
 
@@ -32,6 +43,14 @@ whisk {
       # If multiple instance running then set it to some unique name
       hostname            = "cache-invalidator"
 
+      # Sets a value indicating whether change feed in the Azure Cosmos DB 
service should start from beginning
+      # This is only used when
+      #  1. Lease store is not initialized and is ignored if a lease for 
partition exists and has continuation token
+      #  2. StartContinuation is not specified
+      #  3. StartTime is not specified
+      # This is mostly meant for test purpose where we create both db for 
first time and without this test at times fail
+      start-from-beginning  = false
+
       collections {
         # Provide collection specific connection info here
         # This can be used if lease collection is to be placed in a separate 
endpoint/db
@@ -42,9 +61,6 @@ whisk {
     # HTTP Server port
     port = 8080
 
-    # Timeout for waiting for batch of feed changes to be published as Kafka 
event
-    feed-publish-timeout = 120 s
-
     # Current clusterId - If configured then changes which are done by current 
cluster would be ignored
     # i.e. no cache invalidation event message would be generated for those 
changes
     # cluster-id =
diff --git 
a/core/cosmosdb/cache-invalidator/src/main/resources/whisk-logback.xml 
b/core/cosmosdb/cache-invalidator/src/main/resources/whisk-logback.xml
deleted file mode 100644
index 70936ac..0000000
--- a/core/cosmosdb/cache-invalidator/src/main/resources/whisk-logback.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<included>
-    <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
-        <resetJUL>true</resetJUL>
-    </contextListener>
-    <!-- Disable verbose info logging from processor library-->
-    <logger name="com.microsoft.azure.documentdb.changefeedprocessor.services" 
level="WARN" />
-    <logger name="com.microsoft.azure.documentdb.changefeedprocessor.internal" 
level="WARN" />
-
-    <logger 
name="org.apache.openwhisk.core.database.cosmosdb.cache.WhiskChangeEventObserver"
 level="DEBUG" />
-</included>
\ No newline at end of file
diff --git 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
index e318952..6a97bcc 100644
--- 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
+++ 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
@@ -16,30 +16,27 @@
  */
 package org.apache.openwhisk.core.database.cosmosdb.cache
 
+import akka.Done
 import akka.actor.{ActorSystem, CoordinatedShutdown}
-import akka.event.slf4j.SLF4JLogging
 import akka.kafka.ProducerSettings
 import akka.stream.ActorMaterializer
+import com.google.common.base.Throwables
 import com.typesafe.config.Config
 import org.apache.kafka.common.serialization.StringSerializer
-import org.slf4j.bridge.SLF4JBridgeHandler
+import org.apache.openwhisk.common.Logging
+import 
org.apache.openwhisk.core.database.RemoteCacheInvalidation.cacheInvalidationTopic
 
-import scala.concurrent.Future
-import scala.util.Success
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
 
-object CacheInvalidator extends SLF4JLogging {
-  //CosmosDB changefeed support uses Java Logging.
-  // Those needs to be routed to Slf4j
-  SLF4JBridgeHandler.removeHandlersForRootLogger()
-  SLF4JBridgeHandler.install()
-
-  //TODO Replace with constant from RemoteCacheInvalidation
-  val cacheInvalidationTopic = "cacheInvalidation"
+object CacheInvalidator {
 
   val instanceId = "cache-invalidator"
   val whisksCollection = "whisks"
 
-  def start(globalConfig: Config)(implicit system: ActorSystem, materializer: 
ActorMaterializer): Unit = {
+  def start(
+    globalConfig: Config)(implicit system: ActorSystem, materializer: 
ActorMaterializer, log: Logging): Future[Done] = {
+    implicit val ec: ExecutionContext = system.dispatcher
     val config = CacheInvalidatorConfig(globalConfig)
     val producer =
       KafkaEventProducer(
@@ -47,24 +44,26 @@ object CacheInvalidator extends SLF4JLogging {
         cacheInvalidationTopic,
         config.eventProducerConfig)
     val observer = new WhiskChangeEventObserver(config.invalidatorConfig, 
producer)
-    val feedManager = new ChangeFeedManager(whisksCollection, observer, config)
-    registerShutdownTasks(system, feedManager, producer)
-    log.info(s"Started the Cache invalidator service. ClusterId 
[${config.invalidatorConfig.clusterId}]")
+    val feedConsumer = new ChangeFeedConsumer(whisksCollection, config, 
observer)
+    feedConsumer.isStarted.andThen {
+      case Success(_) =>
+        registerShutdownTasks(system, feedConsumer, producer)
+        log.info(this, s"Started the Cache invalidator service. ClusterId 
[${config.invalidatorConfig.clusterId}]")
+      case Failure(t) =>
+        log.error(this, "Error occurred while starting the Consumer" + 
Throwables.getStackTraceAsString(t))
+    }
   }
 
   private def registerShutdownTasks(system: ActorSystem,
-                                    feedManager: ChangeFeedManager,
-                                    producer: KafkaEventProducer): Unit = {
+                                    feedConsumer: ChangeFeedConsumer,
+                                    producer: KafkaEventProducer)(implicit ec: 
ExecutionContext, log: Logging): Unit = {
     
CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind,
 "closeFeedListeners") { () =>
-      implicit val ec = system.dispatcher
-      Future
-        .successful {
-          feedManager.close()
-        }
+      feedConsumer
+        .close()
         .flatMap { _ =>
           producer.close().andThen {
             case Success(_) =>
-              log.info("Kafka producer successfully shutdown")
+              log.info(this, "Kafka producer successfully shutdown")
           }
         }
     }
diff --git 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorConfig.scala
 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorConfig.scala
index ce6eed4..5479beb 100644
--- 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorConfig.scala
+++ 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorConfig.scala
@@ -17,34 +17,23 @@
 
 package org.apache.openwhisk.core.database.cosmosdb.cache
 
-import java.net.URI
-
-import 
com.microsoft.azure.documentdb.changefeedprocessor.{DocumentCollectionInfo => 
JDocumentCollectionInfo}
+import com.azure.data.cosmos.{ConnectionMode, ConsistencyLevel}
 import com.typesafe.config.Config
 import com.typesafe.config.ConfigUtil.joinPath
 import pureconfig.loadConfigOrThrow
 
-import scala.concurrent.duration.FiniteDuration
-
-case class DocumentCollectionInfo(connectionInfo: ConnectionInfo, 
collectionName: String) {
-
-  def asJava: JDocumentCollectionInfo = {
-    val info = new JDocumentCollectionInfo
-    info.setUri(new URI(connectionInfo.endpoint))
-    info.setDatabaseName(connectionInfo.db)
-    info.setCollectionName(collectionName)
-    info.setMasterKey(connectionInfo.key)
-    info
-  }
-}
-
-case class ConnectionInfo(endpoint: String, key: String, db: String)
+case class ConnectionInfo(endpoint: String,
+                          key: String,
+                          db: String,
+                          throughput: Int,
+                          connectionMode: ConnectionMode,
+                          consistencyLevel: ConsistencyLevel)
 
-case class FeedConfig(hostname: String, leaseCollection: String)
+case class FeedConfig(hostname: String, leaseCollection: String, 
startFromBeginning: Boolean)
 
 case class EventProducerConfig(bufferSize: Int)
 
-case class InvalidatorConfig(port: Int, feedPublishTimeout: FiniteDuration, 
clusterId: Option[String])
+case class InvalidatorConfig(port: Int, clusterId: Option[String])
 
 case class CacheInvalidatorConfig(globalConfig: Config) {
   val configRoot = "whisk.cache-invalidator"
@@ -56,7 +45,7 @@ case class CacheInvalidatorConfig(globalConfig: Config) {
     
loadConfigOrThrow[EventProducerConfig](globalConfig.getConfig(eventConfigRoot))
   val invalidatorConfig: InvalidatorConfig = 
loadConfigOrThrow[InvalidatorConfig](globalConfig.getConfig(configRoot))
 
-  def getCollectionInfo(name: String): DocumentCollectionInfo = {
+  def getCollectionInfo(name: String): ConnectionInfo = {
     val config = globalConfig.getConfig(cosmosConfigRoot)
     val specificConfigPath = joinPath(connections, name)
 
@@ -67,7 +56,6 @@ case class CacheInvalidatorConfig(globalConfig: Config) {
       config
     }
 
-    val info = loadConfigOrThrow[ConnectionInfo](entityConfig)
-    DocumentCollectionInfo(info, name)
+    loadConfigOrThrow[ConnectionInfo](entityConfig)
   }
 }
diff --git 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedConsumer.scala
 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedConsumer.scala
new file mode 100644
index 0000000..27afef7
--- /dev/null
+++ 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedConsumer.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb.cache
+
+import java.util
+
+import akka.Done
+import 
com.azure.data.cosmos.internal.changefeed.implementation.ChangeFeedProcessorBuilderImpl
+import 
com.azure.data.cosmos.internal.changefeed.{ChangeFeedObserverCloseReason, 
ChangeFeedObserverContext}
+import com.azure.data.cosmos.{
+  ChangeFeedProcessor,
+  ChangeFeedProcessorOptions,
+  ConnectionPolicy,
+  CosmosClient,
+  CosmosContainer,
+  CosmosItemProperties
+}
+import org.apache.openwhisk.common.Logging
+import reactor.core.publisher.Mono
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable.Seq
+import scala.compat.java8.FutureConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+trait ChangeFeedObserver {
+  def process(context: ChangeFeedObserverContext, docs: 
Seq[CosmosItemProperties]): Future[Done]
+}
+
+class ChangeFeedConsumer(collName: String, config: CacheInvalidatorConfig, 
observer: ChangeFeedObserver)(
+  implicit ec: ExecutionContext,
+  log: Logging) {
+  import ChangeFeedConsumer._
+
+  log.info(this, s"Watching changes in $collName with lease managed in 
${config.feedConfig.leaseCollection}")
+
+  private val clients = scala.collection.mutable.Map[ConnectionInfo, 
CosmosClient]().withDefault(createCosmosClient)
+  private val targetContainer = getContainer(collName)
+  private val leaseContainer = getContainer(config.feedConfig.leaseCollection, 
createIfNotExist = true)
+  private val (processor, startFuture) = {
+    val clusterId = config.invalidatorConfig.clusterId
+    val prefix = clusterId.map(id => s"$id-$collName").getOrElse(collName)
+
+    val feedOpts = new ChangeFeedProcessorOptions
+    feedOpts.leasePrefix(prefix)
+    feedOpts.startFromBeginning(config.feedConfig.startFromBeginning)
+
+    val builder = ChangeFeedProcessor.Builder
+      .hostName(config.feedConfig.hostname)
+      .feedContainer(targetContainer)
+      .leaseContainer(leaseContainer)
+      .options(feedOpts)
+      .asInstanceOf[ChangeFeedProcessorBuilderImpl] //observerFactory is not 
exposed hence need to cast to impl
+
+    builder.observerFactory(() => ObserverBridge)
+    val p = builder.build()
+    (p, p.start().toFuture.toScala.map(_ => Done))
+  }
+
+  def isStarted: Future[Done] = startFuture
+
+  def close(): Future[Done] = {
+    val f = processor.stop().toFuture.toScala.map(_ => Done)
+    f.andThen {
+      case _ =>
+        clients.values.foreach(c => c.close())
+        Future.successful(Done)
+    }
+  }
+
+  private def getContainer(name: String, createIfNotExist: Boolean = false): 
CosmosContainer = {
+    val info = config.getCollectionInfo(name)
+    val client = clients(info)
+    val db = client.getDatabase(info.db)
+    val container = db.getContainer(name)
+
+    val resp = if (createIfNotExist) {
+      db.createContainerIfNotExists(name, "/id", info.throughput)
+    } else container.read()
+
+    resp.block().container()
+  }
+
+  private object ObserverBridge extends 
com.azure.data.cosmos.internal.changefeed.ChangeFeedObserver {
+    override def open(context: ChangeFeedObserverContext): Unit = {}
+    override def close(context: ChangeFeedObserverContext, reason: 
ChangeFeedObserverCloseReason): Unit = {}
+    override def processChanges(context: ChangeFeedObserverContext,
+                                docs: util.List[CosmosItemProperties]): 
Mono[Void] = {
+      val f = observer.process(context, docs.asScala.toList).map(_ => 
null).toJava.toCompletableFuture
+      Mono.fromFuture(f)
+    }
+  }
+}
+
+object ChangeFeedConsumer {
+  def createCosmosClient(conInfo: ConnectionInfo): CosmosClient = {
+    val policy = 
ConnectionPolicy.defaultPolicy.connectionMode(conInfo.connectionMode)
+    CosmosClient.builder
+      .endpoint(conInfo.endpoint)
+      .key(conInfo.key)
+      .connectionPolicy(policy)
+      .consistencyLevel(conInfo.consistencyLevel)
+      .build
+  }
+}
diff --git 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedListener.scala
 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedListener.scala
deleted file mode 100644
index 48c1f93..0000000
--- 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedListener.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.openwhisk.core.database.cosmosdb.cache
-
-import java.io.Closeable
-import java.util
-
-import com.microsoft.azure.documentdb.changefeedprocessor.{
-  ChangeFeedEventHost,
-  ChangeFeedHostOptions,
-  ChangeFeedObserverCloseReason,
-  ChangeFeedObserverContext,
-  IChangeFeedObserver
-}
-import com.microsoft.azure.documentdb.{ChangeFeedOptions, Document}
-import org.apache.openwhisk.common.ExecutorCloser
-
-import scala.collection.JavaConverters._
-import scala.collection.immutable.Seq
-
-class ChangeFeedManager(collName: String, observer: ChangeFeedObserver, 
config: CacheInvalidatorConfig)
-    extends Closeable {
-  private val listener = {
-    val collInfo = config.getCollectionInfo(collName)
-    val leaseCollInfo = 
config.getCollectionInfo(config.feedConfig.leaseCollection)
-    new ChangeFeedListener(collInfo, leaseCollInfo, config.feedConfig, 
observer, config.invalidatorConfig.clusterId)
-  }
-
-  override def close(): Unit = listener.close()
-}
-
-class ChangeFeedListener(collInfo: DocumentCollectionInfo,
-                         leaseCollInfo: DocumentCollectionInfo,
-                         feedConfig: FeedConfig,
-                         observer: ChangeFeedObserver,
-                         clusterId: Option[String])
-    extends Closeable {
-  private val host = {
-    val feedOpts = new ChangeFeedOptions
-    feedOpts.setPageSize(100)
-
-    val hostOpts = new ChangeFeedHostOptions
-    //Using same lease collection across collection. To avoid collision
-    //set prefix to coll name. Also include the clusterId such that multiple 
cluster
-    //can share the same collection
-    val prefix = clusterId.map(id => 
s"$id-${collInfo.collectionName}").getOrElse(collInfo.collectionName)
-    hostOpts.setLeasePrefix(prefix)
-
-    val host = new ChangeFeedEventHost(feedConfig.hostname, collInfo.asJava, 
leaseCollInfo.asJava, feedOpts, hostOpts)
-    host.registerObserverFactory(() => observer)
-    host
-  }
-
-  override def close(): Unit = ExecutorCloser(host.getExecutorService).close()
-}
-
-abstract class ChangeFeedObserver extends IChangeFeedObserver {
-  override final def open(context: ChangeFeedObserverContext): Unit = Unit
-  override final def close(context: ChangeFeedObserverContext, reason: 
ChangeFeedObserverCloseReason): Unit = Unit
-  override final def processChanges(context: ChangeFeedObserverContext, docs: 
util.List[Document]): Unit =
-    process(context, docs.asScala.toList)
-  def process(context: ChangeFeedObserverContext, doc: Seq[Document])
-}
diff --git 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
index 88a26e0..6b15e6a 100644
--- 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
+++ 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
@@ -23,7 +23,9 @@ import akka.kafka.scaladsl.Producer
 import akka.kafka.{ProducerMessage, ProducerSettings}
 import akka.stream.scaladsl.{Keep, Sink, Source}
 import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
+import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.openwhisk.connector.kafka.KamonMetricsReporter
 
 import scala.collection.immutable.Seq
 import scala.concurrent.{ExecutionContext, Future, Promise}
@@ -41,7 +43,7 @@ case class KafkaEventProducer(
       case (msgs, p) =>
         ProducerMessage.multi(msgs.map(newRecord), p)
     }
-    .via(Producer.flexiFlow(settings))
+    .via(Producer.flexiFlow(producerSettings))
     .map {
       case ProducerMessage.MultiResult(_, passThrough) =>
         passThrough.success(Done)
@@ -66,4 +68,7 @@ case class KafkaEventProducer(
   }
 
   private def newRecord(msg: String) = new ProducerRecord[String, 
String](topic, "messages", msg)
+
+  private def producerSettings =
+    settings.withProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
KamonMetricsReporter.name)
 }
diff --git 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
index 7b3a7f2..a5060d4 100644
--- 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
+++ 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
@@ -20,18 +20,20 @@ package org.apache.openwhisk.core.database.cosmosdb.cache
 import akka.actor.ActorSystem
 import akka.stream.ActorMaterializer
 import kamon.Kamon
-import org.apache.openwhisk.common.ConfigMXBean
+import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging}
 import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
 
 object Main {
   def main(args: Array[String]): Unit = {
     implicit val system: ActorSystem = 
ActorSystem("cache-invalidator-actor-system")
     implicit val materializer: ActorMaterializer = ActorMaterializer()
+    implicit val log: Logging = new 
AkkaLogging(akka.event.Logging.getLogger(system, this))
+
     ConfigMXBean.register()
     Kamon.loadReportersFromConfig()
     val port = 
CacheInvalidatorConfig(system.settings.config).invalidatorConfig.port
-    //TODO HTTPS for ping/metric endpoint?
     BasicHttpService.startHttpService(new BasicRasService {}.route, port, None)
     CacheInvalidator.start(system.settings.config)
+    log.info(this, s"Started the server at http://localhost:$port";)
   }
 }
diff --git 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserver.scala
 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserver.scala
index 184ba75..5c8b404 100644
--- 
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserver.scala
+++ 
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserver.scala
@@ -18,31 +18,38 @@
 package org.apache.openwhisk.core.database.cosmosdb.cache
 
 import akka.Done
-import akka.event.slf4j.SLF4JLogging
-import com.microsoft.azure.documentdb.Document
-import 
com.microsoft.azure.documentdb.changefeedprocessor.ChangeFeedObserverContext
+import com.azure.data.cosmos.CosmosItemProperties
+import com.azure.data.cosmos.internal.changefeed.ChangeFeedObserverContext
+import com.google.common.base.Throwables
 import kamon.metric.MeasurementUnit
-import org.apache.openwhisk.common.{LogMarkerToken, MetricEmitter}
+import org.apache.openwhisk.common.{LogMarkerToken, Logging, MetricEmitter}
 import org.apache.openwhisk.core.database.CacheInvalidationMessage
 import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants
-import org.apache.openwhisk.core.entity.CacheKey
 import org.apache.openwhisk.core.database.cosmosdb.CosmosDBUtil.unescapeId
+import org.apache.openwhisk.core.entity.CacheKey
 
 import scala.collection.concurrent.TrieMap
 import scala.collection.immutable.Seq
-import scala.concurrent.{Await, Future}
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
 
-class WhiskChangeEventObserver(config: InvalidatorConfig, eventProducer: 
EventProducer) extends ChangeFeedObserver {
+class WhiskChangeEventObserver(config: InvalidatorConfig, eventProducer: 
EventProducer)(implicit ec: ExecutionContext,
+                                                                               
         log: Logging)
+    extends ChangeFeedObserver {
   import WhiskChangeEventObserver._
 
-  override def process(context: ChangeFeedObserverContext, docs: 
Seq[Document]): Unit = {
+  override def process(context: ChangeFeedObserverContext, docs: 
Seq[CosmosItemProperties]): Future[Done] = {
     //Each observer is called from a pool managed by CosmosDB 
ChangeFeedProcessor
     //So its fine to have a blocking wait. If this fails then batch would be 
reread and
     //retried thus ensuring at-least-once semantics
     val f = eventProducer.send(processDocs(docs, config))
-    Await.result(f, config.feedPublishTimeout)
-    MetricEmitter.emitCounterMetric(feedCounter, docs.size)
-    recordLag(context, docs.last)
+    f.andThen {
+      case Success(_) =>
+        MetricEmitter.emitCounterMetric(feedCounter, docs.size)
+        recordLag(context, docs.last)
+      case Failure(t) =>
+        log.warn(this, "Error occurred while sending cache invalidation 
message " + Throwables.getStackTraceAsString(t))
+    }
   }
 }
 
@@ -50,7 +57,7 @@ trait EventProducer {
   def send(msg: Seq[String]): Future[Done]
 }
 
-object WhiskChangeEventObserver extends SLF4JLogging {
+object WhiskChangeEventObserver {
   val instanceId = "cache-invalidator"
   private val feedCounter =
     LogMarkerToken("cosmosdb", "change_feed", "count", tags = Map("collection" 
-> "whisks"))(MeasurementUnit.none)
@@ -59,8 +66,8 @@ object WhiskChangeEventObserver extends SLF4JLogging {
   /**
    * Records the current lag on per partition basis. In ideal cases the lag 
should not continue to increase
    */
-  def recordLag(context: ChangeFeedObserverContext, lastDoc: Document): Unit = 
{
-    val sessionToken = context.getFeedResponde.getSessionToken
+  def recordLag(context: ChangeFeedObserverContext, lastDoc: 
CosmosItemProperties): Unit = {
+    val sessionToken = context.getFeedResponse.sessionToken()
     val lsnRef = lastDoc.get("_lsn")
     require(lsnRef != null, s"Non lsn defined in document $lastDoc")
 
@@ -87,7 +94,7 @@ object WhiskChangeEventObserver extends SLF4JLogging {
     lsn.toLong
   }
 
-  def processDocs(docs: Seq[Document], config: InvalidatorConfig): Seq[String] 
= {
+  def processDocs(docs: Seq[CosmosItemProperties], config: 
InvalidatorConfig)(implicit log: Logging): Seq[String] = {
     docs
       .filter { doc =>
         val cid = Option(doc.getString(CosmosDBConstants.clusterId))
@@ -100,8 +107,8 @@ object WhiskChangeEventObserver extends SLF4JLogging {
         }
       }
       .map { doc =>
-        val id = unescapeId(doc.getId)
-        log.debug("Changed doc [{}]", id)
+        val id = unescapeId(doc.id())
+        log.info(this, s"Changed doc [$id]")
         val event = CacheInvalidationMessage(CacheKey(id), instanceId)
         event.serialize
       }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
index 2fef6b8..1d83e33 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
@@ -16,6 +16,7 @@
  */
 
 package org.apache.openwhisk.core.database.cosmosdb.cache
+import akka.Done
 import akka.actor.CoordinatedShutdown
 import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec}
 import akka.stream.ActorMaterializer
@@ -24,7 +25,7 @@ import common.StreamLogging
 import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.openwhisk.common.TransactionId
-import org.apache.openwhisk.core.database.CacheInvalidationMessage
+import org.apache.openwhisk.core.database.{CacheInvalidationMessage, 
RemoteCacheInvalidation}
 import 
org.apache.openwhisk.core.database.cosmosdb.{CosmosDBArtifactStoreProvider, 
CosmosDBTestSupport}
 import org.apache.openwhisk.core.entity.{
   DocumentReader,
@@ -36,7 +37,7 @@ import org.apache.openwhisk.core.entity.{
   WhiskPackage
 }
 import org.junit.runner.RunWith
-import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
+import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{Matchers, TryValues}
 
@@ -51,11 +52,11 @@ class CacheInvalidatorTests
     with CosmosDBTestSupport
     with Matchers
     with ScalaFutures
-    with IntegrationPatience
     with TryValues
     with StreamLogging {
 
-  implicit val materializer: ActorMaterializer = ActorMaterializer()
+  private implicit val materializer: ActorMaterializer = ActorMaterializer()
+  implicit override val patienceConfig: PatienceConfig = 
PatienceConfig(timeout = 300.seconds)
 
   override def createKafkaConfig: EmbeddedKafkaConfig = 
EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)
 
@@ -79,14 +80,16 @@ class CacheInvalidatorTests
     val store = 
CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](dbConfig, None)
     val pkg = WhiskPackage(EntityPath("cacheInvalidationTest"), 
EntityName(randomString()))
 
+    //Start cache invalidator after the db for whisks is created
+    startCacheInvalidator()
+    log.info("Cache Invalidator service started")
+
     //Store stuff in db
     val info = store.put(pkg).futureValue
     log.info(s"Added document ${info.id}")
 
-    //Start cache invalidator after the db for whisks is created
-    startCacheInvalidator()
     //This should result in change feed trigger and event to kafka topic
-    val topic = "cacheInvalidation"
+    val topic = RemoteCacheInvalidation.cacheInvalidationTopic
     val msgs =
       consumeNumberMessagesFromTopics(Set(topic), 1, timeout = 
60.seconds)(createKafkaConfig, new StringDeserializer())(
         topic)
@@ -109,11 +112,12 @@ class CacheInvalidatorTests
       |  cache-invalidator {
       |    cosmosdb {
       |      db = "$dbName"
+      |      start-from-beginning  = true
       |    }
       |  }
       |}
       """.stripMargin).withFallback(ConfigFactory.load())
-    CacheInvalidator.start(tsconfig)
+    CacheInvalidator.start(tsconfig).futureValue shouldBe Done
   }
 
 }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserverTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserverTests.scala
index a372fa3..6d369db 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserverTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserverTests.scala
@@ -16,7 +16,8 @@
  */
 
 package org.apache.openwhisk.core.database.cosmosdb.cache
-import com.microsoft.azure.documentdb.Document
+import com.azure.data.cosmos.CosmosItemProperties
+import common.StreamLogging
 import org.apache.openwhisk.core.database.CacheInvalidationMessage
 import org.apache.openwhisk.core.entity.CacheKey
 import org.junit.runner.RunWith
@@ -25,10 +26,9 @@ import org.scalatest.{FlatSpec, Matchers}
 import spray.json.DefaultJsonProtocol
 
 import scala.collection.immutable.Seq
-import scala.concurrent.duration._
 
 @RunWith(classOf[JUnitRunner])
-class WhiskChangeEventObserverTests extends FlatSpec with Matchers {
+class WhiskChangeEventObserverTests extends FlatSpec with Matchers with 
StreamLogging {
   import WhiskChangeEventObserver.instanceId
 
   behavior of "CosmosDB extract LSN from Session token"
@@ -48,7 +48,7 @@ class WhiskChangeEventObserverTests extends FlatSpec with 
Matchers {
   behavior of "CosmosDB feed events"
 
   it should "generate cache events" in {
-    val config = InvalidatorConfig(8080, 60.seconds, None)
+    val config = InvalidatorConfig(8080, None)
     val docs = Seq(createDoc("foo"), createDoc("bar"))
     val processedDocs = WhiskChangeEventObserver.processDocs(docs, config)
 
@@ -58,7 +58,7 @@ class WhiskChangeEventObserverTests extends FlatSpec with 
Matchers {
   }
 
   it should "filter clusterId" in {
-    val config = InvalidatorConfig(8080, 60.seconds, Some("cid1"))
+    val config = InvalidatorConfig(8080, Some("cid1"))
     val docs = Seq(createDoc("foo", Some("cid2")), createDoc("bar", 
Some("cid1")), createDoc("baz"))
     val processedDocs = WhiskChangeEventObserver.processDocs(docs, config)
 
@@ -68,10 +68,10 @@ class WhiskChangeEventObserverTests extends FlatSpec with 
Matchers {
       CacheInvalidationMessage(CacheKey("baz"), instanceId))
   }
 
-  private def createDoc(id: String, clusterId: Option[String] = None): 
Document = {
+  private def createDoc(id: String, clusterId: Option[String] = None): 
CosmosItemProperties = {
     val cdoc = CosmosDBDoc(id, clusterId)
     val json = CosmosDBDoc.seredes.write(cdoc).compactPrint
-    new Document(json)
+    new CosmosItemProperties(json)
   }
 
   case class CosmosDBDoc(id: String, _clusterId: Option[String], _lsn: Int = 
42)
diff --git a/tools/jenkins/apache/dockerhub.groovy 
b/tools/jenkins/apache/dockerhub.groovy
index a9c8a5f..3206e37 100644
--- a/tools/jenkins/apache/dockerhub.groovy
+++ b/tools/jenkins/apache/dockerhub.groovy
@@ -29,7 +29,7 @@ node('openwhisk1||openwhisk2||openwhisk3') {
       withCredentials([usernamePassword(credentialsId: 'openwhisk_dockerhub', 
passwordVariable: 'DOCKER_PASSWORD', usernameVariable: 'DOCKER_USER')]) {
           sh 'docker login -u ${DOCKER_USER} -p ${DOCKER_PASSWORD}'
       }
-      def PUSH_CMD = "./gradlew :core:controller:distDocker 
:core:invoker:distDocker :core:monitoring:user-events:distDocker 
:tools:ow-utils:distDocker -PdockerRegistry=docker.io 
-PdockerImagePrefix=openwhisk"
+      def PUSH_CMD = "./gradlew :core:controller:distDocker 
:core:invoker:distDocker :core:monitoring:user-events:distDocker 
:tools:ow-utils:distDocker :core:cosmos:cache-invalidator:distDocker 
-PdockerRegistry=docker.io -PdockerImagePrefix=openwhisk"
       def gitCommit = sh(returnStdout: true, script: 'git rev-parse 
HEAD').trim()
       def shortCommit = gitCommit.take(7)
       sh "./gradlew clean"

Reply via email to