This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new cd7196a  Introduce separate Akka dispatchers for CouchDB and Kafka 
Clients (#3515)
cd7196a is described below

commit cd7196a8e16aef6e848add239f6dec83c208e5f7
Author: Brendan McAdams <[email protected]>
AuthorDate: Wed Jun 13 12:13:49 2018 -0500

    Introduce separate Akka dispatchers for CouchDB and Kafka Clients (#3515)
    
    - Rest of system continues to use the default system dispatcher
    - Kafka and CouchDB clients each get their own dispatchers
      * This should prevent all of the akka based stuff from competing 
constantly for threads
      * Kafka and Couch can be tuned now based on individual needs
      * Using thread-pool dispatchers, which are best suited for these 
processes (default is fork-join)
    - Dispatchers are defined in common `reference.conf`, so config can be 
overridden by dependents
    
    Refs #2954
---
 common/scala/src/main/resources/reference.conf     | 53 ++++++++++++++++++++++
 .../whisk/core/connector/MessageConsumer.scala     |  2 +-
 .../whisk/core/database/CouchDbRestClient.scala    |  5 +-
 .../whisk/core/database/CouchDbRestStore.scala     |  2 +-
 .../core/database/RemoteCacheInvalidation.scala    |  2 +-
 5 files changed, 58 insertions(+), 6 deletions(-)

diff --git a/common/scala/src/main/resources/reference.conf 
b/common/scala/src/main/resources/reference.conf
index 35cf4f0..df50702 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -8,3 +8,56 @@ whisk.spi {
   LogStoreProvider = 
whisk.core.containerpool.logging.DockerToActivationLogStoreProvider
   LoadBalancerProvider = whisk.core.loadBalancer.ShardingContainerPoolBalancer
 }
+
+dispatchers {
+  # Custom dispatcher for CouchDB Client. Tune as needed.
+  couch-dispatcher {
+    type = Dispatcher
+    executor = "thread-pool-executor"
+
+    # Underlying thread pool implementation is 
java.util.concurrent.ThreadPoolExecutor
+    thread-pool-executor {
+      # Min number of threads to cap factor-based corePoolSize number to
+      core-pool-size-min = 2
+
+      # The core-pool-size-factor is used to determine corePoolSize of the
+      # ThreadPoolExecutor using the following formula:
+      # ceil(available processors * factor).
+      # Resulting size is then bounded by the core-pool-size-min and
+      # core-pool-size-max values.
+      core-pool-size-factor = 2.0
+
+      # Max number of threads to cap factor-based corePoolSize number to
+      core-pool-size-max = 32
+    }
+    # Throughput defines the number of messages that are processed in a batch
+    # before the thread is returned to the pool. Set to 1 for as fair as 
possible.
+    throughput = 5
+  }
+
+  # Custom dispatcher for Kafka client. Tune as needed.
+  kafka-dispatcher {
+    type = Dispatcher
+    executor = "thread-pool-executor"
+
+    # Underlying thread pool implementation is 
java.util.concurrent.ThreadPoolExecutor
+    thread-pool-executor {
+      # Min number of threads to cap factor-based corePoolSize number to
+      core-pool-size-min = 2
+
+      # The core-pool-size-factor is used to determine corePoolSize of the
+      # ThreadPoolExecutor using the following formula:
+      # ceil(available processors * factor).
+      # Resulting size is then bounded by the core-pool-size-min and
+      # core-pool-size-max values.
+      core-pool-size-factor = 2.0
+
+      # Max number of threads to cap factor-based corePoolSize number to
+      core-pool-size-max = 32
+    }
+
+    # Throughput defines the number of messages that are processed in a batch
+    # before the thread is returned to the pool. Set to 1 for as fair as 
possible.
+    throughput = 5
+  }
+}
diff --git 
a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala 
b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
index 14af69e..4c21545 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
@@ -170,7 +170,7 @@ class MessageFeed(description: String,
   startWith(Idle, MessageFeed.NoData)
   initialize()
 
-  private implicit val ec = context.system.dispatcher
+  private implicit val ec = 
context.system.dispatchers.lookup("dispatchers.kafka-dispatcher")
 
   private def fillPipeline(): Unit = {
     if (outstandingMessages.size <= pipelineFillThreshold) {
diff --git 
a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala 
b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
index e38ce6f..3e8143a 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
@@ -18,7 +18,6 @@
 package whisk.core.database
 
 import scala.concurrent.Future
-
 import java.net.URLEncoder
 import java.nio.charset.StandardCharsets
 
@@ -27,10 +26,8 @@ import akka.http.scaladsl.model._
 import akka.http.scaladsl.model.headers._
 import akka.stream.scaladsl._
 import akka.util.ByteString
-
 import spray.json._
 import spray.json.DefaultJsonProtocol._
-
 import whisk.common.Logging
 import whisk.http.PoolingRestClient
 
@@ -47,6 +44,8 @@ class CouchDbRestClient(protocol: String, host: String, port: 
Int, username: Str
   logging: Logging)
     extends PoolingRestClient(protocol, host, port, 16 * 1024) {
 
+  implicit override val context = 
system.dispatchers.lookup("dispatchers.couch-dispatcher")
+
   // Headers common to all requests.
   val baseHeaders: List[HttpHeader] =
     List(Authorization(BasicHttpCredentials(username, password)), 
Accept(MediaTypes.`application/json`))
diff --git 
a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala 
b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index d0f13c9..aff7e85 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -62,7 +62,7 @@ class CouchDbRestStore[DocumentAbstraction <: 
DocumentSerializer](dbProtocol: St
     with DefaultJsonProtocol
     with AttachmentInliner {
 
-  protected[core] implicit val executionContext = system.dispatcher
+  protected[core] implicit val executionContext = 
system.dispatchers.lookup("dispatchers.couch-dispatcher")
 
   val attachmentScheme: String = "couch"
 
diff --git 
a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala 
b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
index c632c83..417a8b9 100644
--- 
a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
+++ 
b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
@@ -54,7 +54,7 @@ object CacheInvalidationMessage extends DefaultJsonProtocol {
 class RemoteCacheInvalidation(config: WhiskConfig, component: String, 
instance: InstanceId)(implicit logging: Logging,
                                                                                
             as: ActorSystem) {
 
-  implicit private val ec = as.dispatcher
+  implicit private val ec = 
as.dispatchers.lookup("dispatchers.kafka-dispatcher")
 
   private val topic = "cacheInvalidation"
   private val instanceId = s"$component${instance.toInt}"

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to