This is an automated email from the ASF dual-hosted git repository.
chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new f69ecb7 Switch to Cosmos SDK v3 for Cache Invalidator Service (#4677)
f69ecb7 is described below
commit f69ecb78eafde2cc8f55ba38570f9bab92cc077a
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Tue Oct 22 14:19:12 2019 +0530
Switch to Cosmos SDK v3 for Cache Invalidator Service (#4677)
* Switch to Cosmos SDK v3
* Remove logging related logic as v3 uses slf4j and less chatty in logs
* Register Kamon metric reporter with Kafka producer
* Log the server url upon startup
---
.../core/database/RemoteCacheInvalidation.scala | 11 +-
core/cosmosdb/cache-invalidator/build.gradle | 5 +-
.../src/main/resources/reference.conf | 22 +++-
.../src/main/resources/whisk-logback.xml | 27 -----
.../database/cosmosdb/cache/CacheInvalidator.scala | 47 ++++----
.../cosmosdb/cache/CacheInvalidatorConfig.scala | 34 ++----
.../cosmosdb/cache/ChangeFeedConsumer.scala | 120 +++++++++++++++++++++
.../cosmosdb/cache/ChangeFeedListener.scala | 77 -------------
.../cosmosdb/cache/KafkaEventProducer.scala | 7 +-
.../core/database/cosmosdb/cache/Main.scala | 6 +-
.../cosmosdb/cache/WhiskChangeEventObserver.scala | 41 ++++---
.../cosmosdb/cache/CacheInvalidatorTests.scala | 20 ++--
.../cache/WhiskChangeEventObserverTests.scala | 14 +--
tools/jenkins/apache/dockerhub.groovy | 2 +-
14 files changed, 235 insertions(+), 198 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala
index 9fca79e..62d06c8 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala
@@ -54,19 +54,18 @@ object CacheInvalidationMessage extends DefaultJsonProtocol
{
class RemoteCacheInvalidation(config: WhiskConfig, component: String,
instance: ControllerInstanceId)(
implicit logging: Logging,
as: ActorSystem) {
-
+ import RemoteCacheInvalidation._
implicit private val ec =
as.dispatchers.lookup("dispatchers.kafka-dispatcher")
- private val topic = "cacheInvalidation"
private val instanceId = s"$component${instance.asString}"
private val msgProvider = SpiLoader.get[MessagingProvider]
private val cacheInvalidationConsumer =
- msgProvider.getConsumer(config, s"$topic$instanceId", topic, maxPeek = 128)
+ msgProvider.getConsumer(config, s"$cacheInvalidationTopic$instanceId",
cacheInvalidationTopic, maxPeek = 128)
private val cacheInvalidationProducer = msgProvider.getProducer(config)
def notifyOtherInstancesAboutInvalidation(key: CacheKey): Future[Unit] = {
- cacheInvalidationProducer.send(topic, CacheInvalidationMessage(key,
instanceId)).map(_ => Unit)
+ cacheInvalidationProducer.send(cacheInvalidationTopic,
CacheInvalidationMessage(key, instanceId)).map(_ => Unit)
}
private val invalidationFeed = as.actorOf(Props {
@@ -100,3 +99,7 @@ class RemoteCacheInvalidation(config: WhiskConfig,
component: String, instance:
invalidationFeed ! MessageFeed.Processed
}
}
+
+object RemoteCacheInvalidation {
+ val cacheInvalidationTopic = "cacheInvalidation"
+}
diff --git a/core/cosmosdb/cache-invalidator/build.gradle
b/core/cosmosdb/cache-invalidator/build.gradle
index 0da319c..74356fc 100644
--- a/core/cosmosdb/cache-invalidator/build.gradle
+++ b/core/cosmosdb/cache-invalidator/build.gradle
@@ -34,10 +34,7 @@ repositories {
dependencies {
compile "org.scala-lang:scala-library:${gradle.scala.version}"
compile project(':common:scala')
- compile ("com.microsoft.azure:azure-cosmos-changefeedprocessor:0.9.2") {
- exclude group: 'junit'
- exclude group: 'commons-logging'
- }
+ compile "com.microsoft.azure:azure-cosmos:3.3.0"
compile
"com.typesafe.akka:akka-stream-kafka_2.12:${gradle.akka_kafka.version}"
}
diff --git a/core/cosmosdb/cache-invalidator/src/main/resources/reference.conf
b/core/cosmosdb/cache-invalidator/src/main/resources/reference.conf
index d31607e..e3a64fe 100644
--- a/core/cosmosdb/cache-invalidator/src/main/resources/reference.conf
+++ b/core/cosmosdb/cache-invalidator/src/main/resources/reference.conf
@@ -25,6 +25,17 @@ whisk {
# Database name
db = ${?COSMOSDB_NAME}
+ # ConnectionMode. Can be one of
+ # - GATEWAY
+ # - DIRECT
+ connection-mode = "GATEWAY"
+
+ # Consistency level used for DB operations
+ consistency-level = "SESSION"
+
+ # Default throughput used while creating a new lease collection
+ throughput = 400
+
# Name of collection in which lease related data is stored
lease-collection = "cache-invalidator-lease"
@@ -32,6 +43,14 @@ whisk {
# If multiple instance running then set it to some unique name
hostname = "cache-invalidator"
+ # Sets a value indicating whether change feed in the Azure Cosmos DB
service should start from beginning
+ # This is only used when
+ # 1. Lease store is not initialized and is ignored if a lease for
partition exists and has continuation token
+ # 2. StartContinuation is not specified
+ # 3. StartTime is not specified
+ # This is mostly meant for test purpose where we create both db for
first time and without this test at times fail
+ start-from-beginning = false
+
collections {
# Provide collection specific connection info here
# This can be used if lease collection is to be placed in a separate
endpoint/db
@@ -42,9 +61,6 @@ whisk {
# HTTP Server port
port = 8080
- # Timeout for waiting for batch of feed changes to be published as Kafka
event
- feed-publish-timeout = 120 s
-
# Current clusterId - If configured then changes which are done by current
cluster would be ignored
# i.e. no cache invalidation event message would be generated for those
changes
# cluster-id =
diff --git
a/core/cosmosdb/cache-invalidator/src/main/resources/whisk-logback.xml
b/core/cosmosdb/cache-invalidator/src/main/resources/whisk-logback.xml
deleted file mode 100644
index 70936ac..0000000
--- a/core/cosmosdb/cache-invalidator/src/main/resources/whisk-logback.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<included>
- <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
- <resetJUL>true</resetJUL>
- </contextListener>
- <!-- Disable verbose info logging from processor library-->
- <logger name="com.microsoft.azure.documentdb.changefeedprocessor.services"
level="WARN" />
- <logger name="com.microsoft.azure.documentdb.changefeedprocessor.internal"
level="WARN" />
-
- <logger
name="org.apache.openwhisk.core.database.cosmosdb.cache.WhiskChangeEventObserver"
level="DEBUG" />
-</included>
\ No newline at end of file
diff --git
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
index e318952..6a97bcc 100644
---
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
+++
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidator.scala
@@ -16,30 +16,27 @@
*/
package org.apache.openwhisk.core.database.cosmosdb.cache
+import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
-import akka.event.slf4j.SLF4JLogging
import akka.kafka.ProducerSettings
import akka.stream.ActorMaterializer
+import com.google.common.base.Throwables
import com.typesafe.config.Config
import org.apache.kafka.common.serialization.StringSerializer
-import org.slf4j.bridge.SLF4JBridgeHandler
+import org.apache.openwhisk.common.Logging
+import
org.apache.openwhisk.core.database.RemoteCacheInvalidation.cacheInvalidationTopic
-import scala.concurrent.Future
-import scala.util.Success
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
-object CacheInvalidator extends SLF4JLogging {
- //CosmosDB changefeed support uses Java Logging.
- // Those needs to be routed to Slf4j
- SLF4JBridgeHandler.removeHandlersForRootLogger()
- SLF4JBridgeHandler.install()
-
- //TODO Replace with constant from RemoteCacheInvalidation
- val cacheInvalidationTopic = "cacheInvalidation"
+object CacheInvalidator {
val instanceId = "cache-invalidator"
val whisksCollection = "whisks"
- def start(globalConfig: Config)(implicit system: ActorSystem, materializer:
ActorMaterializer): Unit = {
+ def start(
+ globalConfig: Config)(implicit system: ActorSystem, materializer:
ActorMaterializer, log: Logging): Future[Done] = {
+ implicit val ec: ExecutionContext = system.dispatcher
val config = CacheInvalidatorConfig(globalConfig)
val producer =
KafkaEventProducer(
@@ -47,24 +44,26 @@ object CacheInvalidator extends SLF4JLogging {
cacheInvalidationTopic,
config.eventProducerConfig)
val observer = new WhiskChangeEventObserver(config.invalidatorConfig,
producer)
- val feedManager = new ChangeFeedManager(whisksCollection, observer, config)
- registerShutdownTasks(system, feedManager, producer)
- log.info(s"Started the Cache invalidator service. ClusterId
[${config.invalidatorConfig.clusterId}]")
+ val feedConsumer = new ChangeFeedConsumer(whisksCollection, config,
observer)
+ feedConsumer.isStarted.andThen {
+ case Success(_) =>
+ registerShutdownTasks(system, feedConsumer, producer)
+ log.info(this, s"Started the Cache invalidator service. ClusterId
[${config.invalidatorConfig.clusterId}]")
+ case Failure(t) =>
+ log.error(this, "Error occurred while starting the Consumer" +
Throwables.getStackTraceAsString(t))
+ }
}
private def registerShutdownTasks(system: ActorSystem,
- feedManager: ChangeFeedManager,
- producer: KafkaEventProducer): Unit = {
+ feedConsumer: ChangeFeedConsumer,
+ producer: KafkaEventProducer)(implicit ec:
ExecutionContext, log: Logging): Unit = {
CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind,
"closeFeedListeners") { () =>
- implicit val ec = system.dispatcher
- Future
- .successful {
- feedManager.close()
- }
+ feedConsumer
+ .close()
.flatMap { _ =>
producer.close().andThen {
case Success(_) =>
- log.info("Kafka producer successfully shutdown")
+ log.info(this, "Kafka producer successfully shutdown")
}
}
}
diff --git
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorConfig.scala
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorConfig.scala
index ce6eed4..5479beb 100644
---
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorConfig.scala
+++
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorConfig.scala
@@ -17,34 +17,23 @@
package org.apache.openwhisk.core.database.cosmosdb.cache
-import java.net.URI
-
-import
com.microsoft.azure.documentdb.changefeedprocessor.{DocumentCollectionInfo =>
JDocumentCollectionInfo}
+import com.azure.data.cosmos.{ConnectionMode, ConsistencyLevel}
import com.typesafe.config.Config
import com.typesafe.config.ConfigUtil.joinPath
import pureconfig.loadConfigOrThrow
-import scala.concurrent.duration.FiniteDuration
-
-case class DocumentCollectionInfo(connectionInfo: ConnectionInfo,
collectionName: String) {
-
- def asJava: JDocumentCollectionInfo = {
- val info = new JDocumentCollectionInfo
- info.setUri(new URI(connectionInfo.endpoint))
- info.setDatabaseName(connectionInfo.db)
- info.setCollectionName(collectionName)
- info.setMasterKey(connectionInfo.key)
- info
- }
-}
-
-case class ConnectionInfo(endpoint: String, key: String, db: String)
+case class ConnectionInfo(endpoint: String,
+ key: String,
+ db: String,
+ throughput: Int,
+ connectionMode: ConnectionMode,
+ consistencyLevel: ConsistencyLevel)
-case class FeedConfig(hostname: String, leaseCollection: String)
+case class FeedConfig(hostname: String, leaseCollection: String,
startFromBeginning: Boolean)
case class EventProducerConfig(bufferSize: Int)
-case class InvalidatorConfig(port: Int, feedPublishTimeout: FiniteDuration,
clusterId: Option[String])
+case class InvalidatorConfig(port: Int, clusterId: Option[String])
case class CacheInvalidatorConfig(globalConfig: Config) {
val configRoot = "whisk.cache-invalidator"
@@ -56,7 +45,7 @@ case class CacheInvalidatorConfig(globalConfig: Config) {
loadConfigOrThrow[EventProducerConfig](globalConfig.getConfig(eventConfigRoot))
val invalidatorConfig: InvalidatorConfig =
loadConfigOrThrow[InvalidatorConfig](globalConfig.getConfig(configRoot))
- def getCollectionInfo(name: String): DocumentCollectionInfo = {
+ def getCollectionInfo(name: String): ConnectionInfo = {
val config = globalConfig.getConfig(cosmosConfigRoot)
val specificConfigPath = joinPath(connections, name)
@@ -67,7 +56,6 @@ case class CacheInvalidatorConfig(globalConfig: Config) {
config
}
- val info = loadConfigOrThrow[ConnectionInfo](entityConfig)
- DocumentCollectionInfo(info, name)
+ loadConfigOrThrow[ConnectionInfo](entityConfig)
}
}
diff --git
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedConsumer.scala
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedConsumer.scala
new file mode 100644
index 0000000..27afef7
--- /dev/null
+++
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedConsumer.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb.cache
+
+import java.util
+
+import akka.Done
+import
com.azure.data.cosmos.internal.changefeed.implementation.ChangeFeedProcessorBuilderImpl
+import
com.azure.data.cosmos.internal.changefeed.{ChangeFeedObserverCloseReason,
ChangeFeedObserverContext}
+import com.azure.data.cosmos.{
+ ChangeFeedProcessor,
+ ChangeFeedProcessorOptions,
+ ConnectionPolicy,
+ CosmosClient,
+ CosmosContainer,
+ CosmosItemProperties
+}
+import org.apache.openwhisk.common.Logging
+import reactor.core.publisher.Mono
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable.Seq
+import scala.compat.java8.FutureConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+trait ChangeFeedObserver {
+ def process(context: ChangeFeedObserverContext, docs:
Seq[CosmosItemProperties]): Future[Done]
+}
+
+class ChangeFeedConsumer(collName: String, config: CacheInvalidatorConfig,
observer: ChangeFeedObserver)(
+ implicit ec: ExecutionContext,
+ log: Logging) {
+ import ChangeFeedConsumer._
+
+ log.info(this, s"Watching changes in $collName with lease managed in
${config.feedConfig.leaseCollection}")
+
+ private val clients = scala.collection.mutable.Map[ConnectionInfo,
CosmosClient]().withDefault(createCosmosClient)
+ private val targetContainer = getContainer(collName)
+ private val leaseContainer = getContainer(config.feedConfig.leaseCollection,
createIfNotExist = true)
+ private val (processor, startFuture) = {
+ val clusterId = config.invalidatorConfig.clusterId
+ val prefix = clusterId.map(id => s"$id-$collName").getOrElse(collName)
+
+ val feedOpts = new ChangeFeedProcessorOptions
+ feedOpts.leasePrefix(prefix)
+ feedOpts.startFromBeginning(config.feedConfig.startFromBeginning)
+
+ val builder = ChangeFeedProcessor.Builder
+ .hostName(config.feedConfig.hostname)
+ .feedContainer(targetContainer)
+ .leaseContainer(leaseContainer)
+ .options(feedOpts)
+ .asInstanceOf[ChangeFeedProcessorBuilderImpl] //observerFactory is not
exposed hence need to cast to impl
+
+ builder.observerFactory(() => ObserverBridge)
+ val p = builder.build()
+ (p, p.start().toFuture.toScala.map(_ => Done))
+ }
+
+ def isStarted: Future[Done] = startFuture
+
+ def close(): Future[Done] = {
+ val f = processor.stop().toFuture.toScala.map(_ => Done)
+ f.andThen {
+ case _ =>
+ clients.values.foreach(c => c.close())
+ Future.successful(Done)
+ }
+ }
+
+ private def getContainer(name: String, createIfNotExist: Boolean = false):
CosmosContainer = {
+ val info = config.getCollectionInfo(name)
+ val client = clients(info)
+ val db = client.getDatabase(info.db)
+ val container = db.getContainer(name)
+
+ val resp = if (createIfNotExist) {
+ db.createContainerIfNotExists(name, "/id", info.throughput)
+ } else container.read()
+
+ resp.block().container()
+ }
+
+ private object ObserverBridge extends
com.azure.data.cosmos.internal.changefeed.ChangeFeedObserver {
+ override def open(context: ChangeFeedObserverContext): Unit = {}
+ override def close(context: ChangeFeedObserverContext, reason:
ChangeFeedObserverCloseReason): Unit = {}
+ override def processChanges(context: ChangeFeedObserverContext,
+ docs: util.List[CosmosItemProperties]):
Mono[Void] = {
+ val f = observer.process(context, docs.asScala.toList).map(_ =>
null).toJava.toCompletableFuture
+ Mono.fromFuture(f)
+ }
+ }
+}
+
+object ChangeFeedConsumer {
+ def createCosmosClient(conInfo: ConnectionInfo): CosmosClient = {
+ val policy =
ConnectionPolicy.defaultPolicy.connectionMode(conInfo.connectionMode)
+ CosmosClient.builder
+ .endpoint(conInfo.endpoint)
+ .key(conInfo.key)
+ .connectionPolicy(policy)
+ .consistencyLevel(conInfo.consistencyLevel)
+ .build
+ }
+}
diff --git
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedListener.scala
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedListener.scala
deleted file mode 100644
index 48c1f93..0000000
---
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/ChangeFeedListener.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.openwhisk.core.database.cosmosdb.cache
-
-import java.io.Closeable
-import java.util
-
-import com.microsoft.azure.documentdb.changefeedprocessor.{
- ChangeFeedEventHost,
- ChangeFeedHostOptions,
- ChangeFeedObserverCloseReason,
- ChangeFeedObserverContext,
- IChangeFeedObserver
-}
-import com.microsoft.azure.documentdb.{ChangeFeedOptions, Document}
-import org.apache.openwhisk.common.ExecutorCloser
-
-import scala.collection.JavaConverters._
-import scala.collection.immutable.Seq
-
-class ChangeFeedManager(collName: String, observer: ChangeFeedObserver,
config: CacheInvalidatorConfig)
- extends Closeable {
- private val listener = {
- val collInfo = config.getCollectionInfo(collName)
- val leaseCollInfo =
config.getCollectionInfo(config.feedConfig.leaseCollection)
- new ChangeFeedListener(collInfo, leaseCollInfo, config.feedConfig,
observer, config.invalidatorConfig.clusterId)
- }
-
- override def close(): Unit = listener.close()
-}
-
-class ChangeFeedListener(collInfo: DocumentCollectionInfo,
- leaseCollInfo: DocumentCollectionInfo,
- feedConfig: FeedConfig,
- observer: ChangeFeedObserver,
- clusterId: Option[String])
- extends Closeable {
- private val host = {
- val feedOpts = new ChangeFeedOptions
- feedOpts.setPageSize(100)
-
- val hostOpts = new ChangeFeedHostOptions
- //Using same lease collection across collection. To avoid collision
- //set prefix to coll name. Also include the clusterId such that multiple
cluster
- //can share the same collection
- val prefix = clusterId.map(id =>
s"$id-${collInfo.collectionName}").getOrElse(collInfo.collectionName)
- hostOpts.setLeasePrefix(prefix)
-
- val host = new ChangeFeedEventHost(feedConfig.hostname, collInfo.asJava,
leaseCollInfo.asJava, feedOpts, hostOpts)
- host.registerObserverFactory(() => observer)
- host
- }
-
- override def close(): Unit = ExecutorCloser(host.getExecutorService).close()
-}
-
-abstract class ChangeFeedObserver extends IChangeFeedObserver {
- override final def open(context: ChangeFeedObserverContext): Unit = Unit
- override final def close(context: ChangeFeedObserverContext, reason:
ChangeFeedObserverCloseReason): Unit = Unit
- override final def processChanges(context: ChangeFeedObserverContext, docs:
util.List[Document]): Unit =
- process(context, docs.asScala.toList)
- def process(context: ChangeFeedObserverContext, doc: Seq[Document])
-}
diff --git
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
index 88a26e0..6b15e6a 100644
---
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
+++
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/KafkaEventProducer.scala
@@ -23,7 +23,9 @@ import akka.kafka.scaladsl.Producer
import akka.kafka.{ProducerMessage, ProducerSettings}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
+import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.openwhisk.connector.kafka.KamonMetricsReporter
import scala.collection.immutable.Seq
import scala.concurrent.{ExecutionContext, Future, Promise}
@@ -41,7 +43,7 @@ case class KafkaEventProducer(
case (msgs, p) =>
ProducerMessage.multi(msgs.map(newRecord), p)
}
- .via(Producer.flexiFlow(settings))
+ .via(Producer.flexiFlow(producerSettings))
.map {
case ProducerMessage.MultiResult(_, passThrough) =>
passThrough.success(Done)
@@ -66,4 +68,7 @@ case class KafkaEventProducer(
}
private def newRecord(msg: String) = new ProducerRecord[String,
String](topic, "messages", msg)
+
+ private def producerSettings =
+ settings.withProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
KamonMetricsReporter.name)
}
diff --git
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
index 7b3a7f2..a5060d4 100644
---
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
+++
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala
@@ -20,18 +20,20 @@ package org.apache.openwhisk.core.database.cosmosdb.cache
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import kamon.Kamon
-import org.apache.openwhisk.common.ConfigMXBean
+import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging}
import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
object Main {
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem =
ActorSystem("cache-invalidator-actor-system")
implicit val materializer: ActorMaterializer = ActorMaterializer()
+ implicit val log: Logging = new
AkkaLogging(akka.event.Logging.getLogger(system, this))
+
ConfigMXBean.register()
Kamon.loadReportersFromConfig()
val port =
CacheInvalidatorConfig(system.settings.config).invalidatorConfig.port
- //TODO HTTPS for ping/metric endpoint?
BasicHttpService.startHttpService(new BasicRasService {}.route, port, None)
CacheInvalidator.start(system.settings.config)
+ log.info(this, s"Started the server at http://localhost:$port")
}
}
diff --git
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserver.scala
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserver.scala
index 184ba75..5c8b404 100644
---
a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserver.scala
+++
b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserver.scala
@@ -18,31 +18,38 @@
package org.apache.openwhisk.core.database.cosmosdb.cache
import akka.Done
-import akka.event.slf4j.SLF4JLogging
-import com.microsoft.azure.documentdb.Document
-import
com.microsoft.azure.documentdb.changefeedprocessor.ChangeFeedObserverContext
+import com.azure.data.cosmos.CosmosItemProperties
+import com.azure.data.cosmos.internal.changefeed.ChangeFeedObserverContext
+import com.google.common.base.Throwables
import kamon.metric.MeasurementUnit
-import org.apache.openwhisk.common.{LogMarkerToken, MetricEmitter}
+import org.apache.openwhisk.common.{LogMarkerToken, Logging, MetricEmitter}
import org.apache.openwhisk.core.database.CacheInvalidationMessage
import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants
-import org.apache.openwhisk.core.entity.CacheKey
import org.apache.openwhisk.core.database.cosmosdb.CosmosDBUtil.unescapeId
+import org.apache.openwhisk.core.entity.CacheKey
import scala.collection.concurrent.TrieMap
import scala.collection.immutable.Seq
-import scala.concurrent.{Await, Future}
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
-class WhiskChangeEventObserver(config: InvalidatorConfig, eventProducer:
EventProducer) extends ChangeFeedObserver {
+class WhiskChangeEventObserver(config: InvalidatorConfig, eventProducer:
EventProducer)(implicit ec: ExecutionContext,
+
log: Logging)
+ extends ChangeFeedObserver {
import WhiskChangeEventObserver._
- override def process(context: ChangeFeedObserverContext, docs:
Seq[Document]): Unit = {
+ override def process(context: ChangeFeedObserverContext, docs:
Seq[CosmosItemProperties]): Future[Done] = {
//Each observer is called from a pool managed by CosmosDB
ChangeFeedProcessor
//So its fine to have a blocking wait. If this fails then batch would be
reread and
//retried thus ensuring at-least-once semantics
val f = eventProducer.send(processDocs(docs, config))
- Await.result(f, config.feedPublishTimeout)
- MetricEmitter.emitCounterMetric(feedCounter, docs.size)
- recordLag(context, docs.last)
+ f.andThen {
+ case Success(_) =>
+ MetricEmitter.emitCounterMetric(feedCounter, docs.size)
+ recordLag(context, docs.last)
+ case Failure(t) =>
+ log.warn(this, "Error occurred while sending cache invalidation
message " + Throwables.getStackTraceAsString(t))
+ }
}
}
@@ -50,7 +57,7 @@ trait EventProducer {
def send(msg: Seq[String]): Future[Done]
}
-object WhiskChangeEventObserver extends SLF4JLogging {
+object WhiskChangeEventObserver {
val instanceId = "cache-invalidator"
private val feedCounter =
LogMarkerToken("cosmosdb", "change_feed", "count", tags = Map("collection"
-> "whisks"))(MeasurementUnit.none)
@@ -59,8 +66,8 @@ object WhiskChangeEventObserver extends SLF4JLogging {
/**
* Records the current lag on per partition basis. In ideal cases the lag
should not continue to increase
*/
- def recordLag(context: ChangeFeedObserverContext, lastDoc: Document): Unit =
{
- val sessionToken = context.getFeedResponde.getSessionToken
+ def recordLag(context: ChangeFeedObserverContext, lastDoc:
CosmosItemProperties): Unit = {
+ val sessionToken = context.getFeedResponse.sessionToken()
val lsnRef = lastDoc.get("_lsn")
require(lsnRef != null, s"Non lsn defined in document $lastDoc")
@@ -87,7 +94,7 @@ object WhiskChangeEventObserver extends SLF4JLogging {
lsn.toLong
}
- def processDocs(docs: Seq[Document], config: InvalidatorConfig): Seq[String]
= {
+ def processDocs(docs: Seq[CosmosItemProperties], config:
InvalidatorConfig)(implicit log: Logging): Seq[String] = {
docs
.filter { doc =>
val cid = Option(doc.getString(CosmosDBConstants.clusterId))
@@ -100,8 +107,8 @@ object WhiskChangeEventObserver extends SLF4JLogging {
}
}
.map { doc =>
- val id = unescapeId(doc.getId)
- log.debug("Changed doc [{}]", id)
+ val id = unescapeId(doc.id())
+ log.info(this, s"Changed doc [$id]")
val event = CacheInvalidationMessage(CacheKey(id), instanceId)
event.serialize
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
index 2fef6b8..1d83e33 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
@@ -16,6 +16,7 @@
*/
package org.apache.openwhisk.core.database.cosmosdb.cache
+import akka.Done
import akka.actor.CoordinatedShutdown
import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec}
import akka.stream.ActorMaterializer
@@ -24,7 +25,7 @@ import common.StreamLogging
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.openwhisk.common.TransactionId
-import org.apache.openwhisk.core.database.CacheInvalidationMessage
+import org.apache.openwhisk.core.database.{CacheInvalidationMessage,
RemoteCacheInvalidation}
import
org.apache.openwhisk.core.database.cosmosdb.{CosmosDBArtifactStoreProvider,
CosmosDBTestSupport}
import org.apache.openwhisk.core.entity.{
DocumentReader,
@@ -36,7 +37,7 @@ import org.apache.openwhisk.core.entity.{
WhiskPackage
}
import org.junit.runner.RunWith
-import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
+import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import org.scalatest.{Matchers, TryValues}
@@ -51,11 +52,11 @@ class CacheInvalidatorTests
with CosmosDBTestSupport
with Matchers
with ScalaFutures
- with IntegrationPatience
with TryValues
with StreamLogging {
- implicit val materializer: ActorMaterializer = ActorMaterializer()
+ private implicit val materializer: ActorMaterializer = ActorMaterializer()
+ implicit override val patienceConfig: PatienceConfig =
PatienceConfig(timeout = 300.seconds)
override def createKafkaConfig: EmbeddedKafkaConfig =
EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)
@@ -79,14 +80,16 @@ class CacheInvalidatorTests
val store =
CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](dbConfig, None)
val pkg = WhiskPackage(EntityPath("cacheInvalidationTest"),
EntityName(randomString()))
+ //Start cache invalidator after the db for whisks is created
+ startCacheInvalidator()
+ log.info("Cache Invalidator service started")
+
//Store stuff in db
val info = store.put(pkg).futureValue
log.info(s"Added document ${info.id}")
- //Start cache invalidator after the db for whisks is created
- startCacheInvalidator()
//This should result in change feed trigger and event to kafka topic
- val topic = "cacheInvalidation"
+ val topic = RemoteCacheInvalidation.cacheInvalidationTopic
val msgs =
consumeNumberMessagesFromTopics(Set(topic), 1, timeout =
60.seconds)(createKafkaConfig, new StringDeserializer())(
topic)
@@ -109,11 +112,12 @@ class CacheInvalidatorTests
| cache-invalidator {
| cosmosdb {
| db = "$dbName"
+ | start-from-beginning = true
| }
| }
|}
""".stripMargin).withFallback(ConfigFactory.load())
- CacheInvalidator.start(tsconfig)
+ CacheInvalidator.start(tsconfig).futureValue shouldBe Done
}
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserverTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserverTests.scala
index a372fa3..6d369db 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserverTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/WhiskChangeEventObserverTests.scala
@@ -16,7 +16,8 @@
*/
package org.apache.openwhisk.core.database.cosmosdb.cache
-import com.microsoft.azure.documentdb.Document
+import com.azure.data.cosmos.CosmosItemProperties
+import common.StreamLogging
import org.apache.openwhisk.core.database.CacheInvalidationMessage
import org.apache.openwhisk.core.entity.CacheKey
import org.junit.runner.RunWith
@@ -25,10 +26,9 @@ import org.scalatest.{FlatSpec, Matchers}
import spray.json.DefaultJsonProtocol
import scala.collection.immutable.Seq
-import scala.concurrent.duration._
@RunWith(classOf[JUnitRunner])
-class WhiskChangeEventObserverTests extends FlatSpec with Matchers {
+class WhiskChangeEventObserverTests extends FlatSpec with Matchers with
StreamLogging {
import WhiskChangeEventObserver.instanceId
behavior of "CosmosDB extract LSN from Session token"
@@ -48,7 +48,7 @@ class WhiskChangeEventObserverTests extends FlatSpec with
Matchers {
behavior of "CosmosDB feed events"
it should "generate cache events" in {
- val config = InvalidatorConfig(8080, 60.seconds, None)
+ val config = InvalidatorConfig(8080, None)
val docs = Seq(createDoc("foo"), createDoc("bar"))
val processedDocs = WhiskChangeEventObserver.processDocs(docs, config)
@@ -58,7 +58,7 @@ class WhiskChangeEventObserverTests extends FlatSpec with
Matchers {
}
it should "filter clusterId" in {
- val config = InvalidatorConfig(8080, 60.seconds, Some("cid1"))
+ val config = InvalidatorConfig(8080, Some("cid1"))
val docs = Seq(createDoc("foo", Some("cid2")), createDoc("bar",
Some("cid1")), createDoc("baz"))
val processedDocs = WhiskChangeEventObserver.processDocs(docs, config)
@@ -68,10 +68,10 @@ class WhiskChangeEventObserverTests extends FlatSpec with
Matchers {
CacheInvalidationMessage(CacheKey("baz"), instanceId))
}
- private def createDoc(id: String, clusterId: Option[String] = None):
Document = {
+ private def createDoc(id: String, clusterId: Option[String] = None):
CosmosItemProperties = {
val cdoc = CosmosDBDoc(id, clusterId)
val json = CosmosDBDoc.seredes.write(cdoc).compactPrint
- new Document(json)
+ new CosmosItemProperties(json)
}
case class CosmosDBDoc(id: String, _clusterId: Option[String], _lsn: Int =
42)
diff --git a/tools/jenkins/apache/dockerhub.groovy
b/tools/jenkins/apache/dockerhub.groovy
index a9c8a5f..3206e37 100644
--- a/tools/jenkins/apache/dockerhub.groovy
+++ b/tools/jenkins/apache/dockerhub.groovy
@@ -29,7 +29,7 @@ node('openwhisk1||openwhisk2||openwhisk3') {
withCredentials([usernamePassword(credentialsId: 'openwhisk_dockerhub',
passwordVariable: 'DOCKER_PASSWORD', usernameVariable: 'DOCKER_USER')]) {
sh 'docker login -u ${DOCKER_USER} -p ${DOCKER_PASSWORD}'
}
- def PUSH_CMD = "./gradlew :core:controller:distDocker
:core:invoker:distDocker :core:monitoring:user-events:distDocker
:tools:ow-utils:distDocker -PdockerRegistry=docker.io
-PdockerImagePrefix=openwhisk"
+ def PUSH_CMD = "./gradlew :core:controller:distDocker
:core:invoker:distDocker :core:monitoring:user-events:distDocker
:tools:ow-utils:distDocker :core:cosmos:cache-invalidator:distDocker
-PdockerRegistry=docker.io -PdockerImagePrefix=openwhisk"
def gitCommit = sh(returnStdout: true, script: 'git rev-parse
HEAD').trim()
def shortCommit = gitCommit.take(7)
sh "./gradlew clean"