This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new cd7196a Introduce separate Akka dispatchers for CouchDB and Kafka
Clients (#3515)
cd7196a is described below
commit cd7196a8e16aef6e848add239f6dec83c208e5f7
Author: Brendan McAdams <[email protected]>
AuthorDate: Wed Jun 13 12:13:49 2018 -0500
Introduce separate Akka dispatchers for CouchDB and Kafka Clients (#3515)
- Rest of system continues to use the default system dispatcher
- Kafka and CouchDB clients each get their own dispatchers
* This should prevent all of the akka based stuff from competing
constantly for threads
* Kafka and Couch can be tuned now based on individual needs
* Using thread-pool dispatchers, which are best suited for these
processes (default is fork-join)
- Dispatchers are defined in common `reference.conf`, so config can be
overridden by dependents
Refs #2954
---
common/scala/src/main/resources/reference.conf | 53 ++++++++++++++++++++++
.../whisk/core/connector/MessageConsumer.scala | 2 +-
.../whisk/core/database/CouchDbRestClient.scala | 5 +-
.../whisk/core/database/CouchDbRestStore.scala | 2 +-
.../core/database/RemoteCacheInvalidation.scala | 2 +-
5 files changed, 58 insertions(+), 6 deletions(-)
diff --git a/common/scala/src/main/resources/reference.conf
b/common/scala/src/main/resources/reference.conf
index 35cf4f0..df50702 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -8,3 +8,56 @@ whisk.spi {
LogStoreProvider =
whisk.core.containerpool.logging.DockerToActivationLogStoreProvider
LoadBalancerProvider = whisk.core.loadBalancer.ShardingContainerPoolBalancer
}
+
+dispatchers {
+ # Custom dispatcher for CouchDB Client. Tune as needed.
+ couch-dispatcher {
+ type = Dispatcher
+ executor = "thread-pool-executor"
+
+ # Underlying thread pool implementation is
java.util.concurrent.ThreadPoolExecutor
+ thread-pool-executor {
+ # Min number of threads to cap factor-based corePoolSize number to
+ core-pool-size-min = 2
+
+ # The core-pool-size-factor is used to determine corePoolSize of the
+ # ThreadPoolExecutor using the following formula:
+ # ceil(available processors * factor).
+ # Resulting size is then bounded by the core-pool-size-min and
+ # core-pool-size-max values.
+ core-pool-size-factor = 2.0
+
+ # Max number of threads to cap factor-based corePoolSize number to
+ core-pool-size-max = 32
+ }
+ # Throughput defines the number of messages that are processed in a batch
+ # before the thread is returned to the pool. Set to 1 for as fair as
possible.
+ throughput = 5
+ }
+
+ # Custom dispatcher for Kafka client. Tune as needed.
+ kafka-dispatcher {
+ type = Dispatcher
+ executor = "thread-pool-executor"
+
+ # Underlying thread pool implementation is
java.util.concurrent.ThreadPoolExecutor
+ thread-pool-executor {
+ # Min number of threads to cap factor-based corePoolSize number to
+ core-pool-size-min = 2
+
+ # The core-pool-size-factor is used to determine corePoolSize of the
+ # ThreadPoolExecutor using the following formula:
+ # ceil(available processors * factor).
+ # Resulting size is then bounded by the core-pool-size-min and
+ # core-pool-size-max values.
+ core-pool-size-factor = 2.0
+
+ # Max number of threads to cap factor-based corePoolSize number to
+ core-pool-size-max = 32
+ }
+
+ # Throughput defines the number of messages that are processed in a batch
+ # before the thread is returned to the pool. Set to 1 for as fair as
possible.
+ throughput = 5
+ }
+}
diff --git
a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
index 14af69e..4c21545 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
@@ -170,7 +170,7 @@ class MessageFeed(description: String,
startWith(Idle, MessageFeed.NoData)
initialize()
- private implicit val ec = context.system.dispatcher
+ private implicit val ec =
context.system.dispatchers.lookup("dispatchers.kafka-dispatcher")
private def fillPipeline(): Unit = {
if (outstandingMessages.size <= pipelineFillThreshold) {
diff --git
a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
index e38ce6f..3e8143a 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
@@ -18,7 +18,6 @@
package whisk.core.database
import scala.concurrent.Future
-
import java.net.URLEncoder
import java.nio.charset.StandardCharsets
@@ -27,10 +26,8 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.stream.scaladsl._
import akka.util.ByteString
-
import spray.json._
import spray.json.DefaultJsonProtocol._
-
import whisk.common.Logging
import whisk.http.PoolingRestClient
@@ -47,6 +44,8 @@ class CouchDbRestClient(protocol: String, host: String, port:
Int, username: Str
logging: Logging)
extends PoolingRestClient(protocol, host, port, 16 * 1024) {
+ implicit override val context =
system.dispatchers.lookup("dispatchers.couch-dispatcher")
+
// Headers common to all requests.
val baseHeaders: List[HttpHeader] =
List(Authorization(BasicHttpCredentials(username, password)),
Accept(MediaTypes.`application/json`))
diff --git
a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index d0f13c9..aff7e85 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -62,7 +62,7 @@ class CouchDbRestStore[DocumentAbstraction <:
DocumentSerializer](dbProtocol: St
with DefaultJsonProtocol
with AttachmentInliner {
- protected[core] implicit val executionContext = system.dispatcher
+ protected[core] implicit val executionContext =
system.dispatchers.lookup("dispatchers.couch-dispatcher")
val attachmentScheme: String = "couch"
diff --git
a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
index c632c83..417a8b9 100644
---
a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
+++
b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
@@ -54,7 +54,7 @@ object CacheInvalidationMessage extends DefaultJsonProtocol {
class RemoteCacheInvalidation(config: WhiskConfig, component: String,
instance: InstanceId)(implicit logging: Logging,
as: ActorSystem) {
- implicit private val ec = as.dispatcher
+ implicit private val ec =
as.dispatchers.lookup("dispatchers.kafka-dispatcher")
private val topic = "cacheInvalidation"
private val instanceId = s"$component${instance.toInt}"
--
To stop receiving notification emails like this one, please contact
[email protected].