markusthoemmes closed pull request #3591: Move kafka/zookeeper hosts to 
pureconfig based configuration.
URL: https://github.com/apache/incubator-openwhisk/pull/3591
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/roles/controller/tasks/deploy.yml 
b/ansible/roles/controller/tasks/deploy.yml
index 29dcc1b96a..4c46424117 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -128,7 +128,8 @@
       "WHISK_VERSION_DATE": "{{ whisk.version.date }}"
       "WHISK_VERSION_BUILDNO": "{{ docker.image.tag }}"
 
-      "KAFKA_HOSTS": "{{ kafka_connect_string }}"
+      "CONFIG_whisk_kafka_common_bootstrapHosts": 
+        "{{ kafka_connect_string }}"
       "CONFIG_whisk_kafka_replicationFactor":
         "{{ kafka.replicationFactor | default() }}"
       "CONFIG_whisk_kafka_topics_cacheInvalidation_retentionBytes":
diff --git a/ansible/roles/invoker/tasks/deploy.yml 
b/ansible/roles/invoker/tasks/deploy.yml
index ee4a79aaf3..843a52e3af 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -149,7 +149,7 @@
       "JMX_REMOTE": "{{ jmx.enabled }}"
       "COMPONENT_NAME": "invoker{{ 
groups['invokers'].index(inventory_hostname) }}"
       "PORT": 8080
-      "KAFKA_HOSTS": "{{ kafka_connect_string }}"
+      "CONFIG_whisk_kafka_common_bootstrapHosts": "{{ kafka_connect_string }}"
       "CONFIG_whisk_kafka_replicationFactor": "{{ kafka.replicationFactor | 
default() }}"
       "CONFIG_whisk_kafka_topics_invoker_retentionBytes": "{{ 
kafka_topics_invoker_retentionBytes | default() }}"
       "CONFIG_whisk_kafka_topics_invoker_retentionMs": "{{ 
kafka_topics_invoker_retentionMS | default() }}"
diff --git 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 16cbe35415..ce8762c0c3 100644
--- 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -34,11 +34,9 @@ import scala.concurrent.{blocking, ExecutionContext, Future}
 
 case class KafkaConsumerConfig(sessionTimeoutMs: Long, metricFlushIntervalS: 
Int)
 
-class KafkaConsumerConnector(
-  kafkahost: String,
-  groupid: String,
-  topic: String,
-  override val maxPeek: Int = Int.MaxValue)(implicit logging: Logging, 
actorSystem: ActorSystem)
+class KafkaConsumerConnector(groupid: String, topic: String, override val 
maxPeek: Int = Int.MaxValue)(
+  implicit logging: Logging,
+  actorSystem: ActorSystem)
     extends MessageConsumer {
 
   implicit val ec: ExecutionContext = actorSystem.dispatcher
@@ -114,7 +112,6 @@ class KafkaConsumerConnector(
   private def createConsumer(topic: String) = {
     val config = Map(
       ConsumerConfig.GROUP_ID_CONFIG -> groupid,
-      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkahost,
       ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPeek.toString) ++
       configMapToKafkaConfig(loadConfigOrThrow[Map[String, 
String]](ConfigKeys.kafkaCommon)) ++
       configMapToKafkaConfig(loadConfigOrThrow[Map[String, 
String]](ConfigKeys.kafkaConsumer))
diff --git 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index e939a4637e..1e8e1eda87 100644
--- 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -21,11 +21,11 @@ import java.util.Properties
 import java.util.concurrent.ExecutionException
 
 import akka.actor.ActorSystem
-import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, 
NewTopic}
+import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
 import org.apache.kafka.common.errors.TopicExistsException
 import pureconfig._
 import whisk.common.Logging
-import whisk.core.{ConfigKeys, WhiskConfig}
+import whisk.core.ConfigKeys
 import whisk.core.connector.{MessageConsumer, MessageProducer, 
MessagingProvider}
 
 import scala.collection.JavaConverters._
@@ -39,22 +39,21 @@ case class KafkaConfig(replicationFactor: Short)
 object KafkaMessagingProvider extends MessagingProvider {
   import KafkaConfiguration._
 
-  def getConsumer(config: WhiskConfig, groupId: String, topic: String, 
maxPeek: Int, maxPollInterval: FiniteDuration)(
+  def getConsumer(groupId: String, topic: String, maxPeek: Int, 
maxPollInterval: FiniteDuration)(
     implicit logging: Logging,
     actorSystem: ActorSystem): MessageConsumer =
-    new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek)
+    new KafkaConsumerConnector(groupId, topic, maxPeek)
 
-  def getProducer(config: WhiskConfig)(implicit logging: Logging, actorSystem: 
ActorSystem): MessageProducer =
-    new KafkaProducerConnector(config.kafkaHosts)
+  def getProducer()(implicit logging: Logging, actorSystem: ActorSystem): 
MessageProducer =
+    new KafkaProducerConnector()
 
-  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: 
String)(implicit logging: Logging): Boolean = {
+  def ensureTopic(topic: String, topicConfig: String)(implicit logging: 
Logging): Boolean = {
     val kc = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka)
     val tc = KafkaConfiguration.configMapToKafkaConfig(
       loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + 
s".$topicConfig"))
 
-    val baseConfig = Map(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> 
config.kafkaHosts)
     val commonConfig = configMapToKafkaConfig(loadConfigOrThrow[Map[String, 
String]](ConfigKeys.kafkaCommon))
-    val client = AdminClient.create(baseConfig ++ commonConfig)
+    val client = AdminClient.create(commonConfig)
     val numPartitions = 1
     val nt = new NewTopic(topic, numPartitions, 
kc.replicationFactor).configs(tc.asJava)
     val results = client.createTopics(List(nt).asJava)
diff --git 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index f82acaf85d..dedf9bc548 100644
--- 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -33,8 +33,8 @@ import scala.concurrent.duration._
 import scala.concurrent.{blocking, ExecutionContext, Future, Promise}
 import scala.util.{Failure, Success}
 
-class KafkaProducerConnector(kafkahosts: String, id: String = 
UUIDs.randomUUID().toString)(implicit logging: Logging,
-                                                                               
            actorSystem: ActorSystem)
+class KafkaProducerConnector(id: String = 
UUIDs.randomUUID().toString)(implicit logging: Logging,
+                                                                       
actorSystem: ActorSystem)
     extends MessageProducer {
 
   implicit val ec: ExecutionContext = actorSystem.dispatcher
@@ -92,8 +92,7 @@ class KafkaProducerConnector(kafkahosts: String, id: String = 
UUIDs.randomUUID()
   private val sentCounter = new Counter()
 
   private def createProducer(): KafkaProducer[String, String] = {
-    val config = Map(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkahosts) ++
-      configMapToKafkaConfig(loadConfigOrThrow[Map[String, 
String]](ConfigKeys.kafkaCommon)) ++
+    val config = configMapToKafkaConfig(loadConfigOrThrow[Map[String, 
String]](ConfigKeys.kafkaCommon)) ++
       configMapToKafkaConfig(loadConfigOrThrow[Map[String, 
String]](ConfigKeys.kafkaProducer))
 
     new KafkaProducer(config, new StringSerializer, new StringSerializer)
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 0c937ae372..9949bf39bc 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -67,12 +67,10 @@ class WhiskConfig(requiredProperties: Map[String, String],
   val controllerInstances = this(WhiskConfig.controllerInstances)
 
   val edgeHost = this(WhiskConfig.edgeHostName) + ":" + 
this(WhiskConfig.edgeHostApiPort)
-  val kafkaHosts = this(WhiskConfig.kafkaHostList)
 
   val edgeHostName = this(WhiskConfig.edgeHostName)
 
   val invokerHosts = this(WhiskConfig.invokerHostsList)
-  val zookeeperHosts = this(WhiskConfig.zookeeperHostList)
 
   val dbPrefix = this(WhiskConfig.dbPrefix)
   val mainDockerEndpoint = this(WhiskConfig.mainDockerEndpoint)
@@ -181,9 +179,6 @@ object WhiskConfig {
 
   val loadbalancerInvokerBusyThreshold = "loadbalancer.invokerBusyThreshold"
 
-  val kafkaHostList = "kafka.hosts"
-  val zookeeperHostList = "zookeeper.hosts"
-
   private val edgeHostApiPort = "edge.host.apiport"
 
   val invokerHostsList = "invoker.hosts"
@@ -191,8 +186,6 @@ object WhiskConfig {
 
   val edgeHost = Map(edgeHostName -> null, edgeHostApiPort -> null)
   val invokerHosts = Map(invokerHostsList -> null)
-  val kafkaHosts = Map(kafkaHostList -> null)
-  val zookeeperHosts = Map(zookeeperHostList -> null)
 
   val runtimesManifest = "runtimes.manifest"
 
@@ -209,6 +202,9 @@ object ConfigKeys {
   val loadbalancer = "whisk.loadbalancer"
 
   val couchdb = "whisk.couchdb"
+
+  val zookeeper = "whisk.zookeeper"
+
   val kafka = "whisk.kafka"
   val kafkaCommon = s"$kafka.common"
   val kafkaProducer = s"$kafka.producer"
@@ -228,7 +224,7 @@ object ConfigKeys {
 
   val docker = "whisk.docker"
   val dockerTimeouts = s"$docker.timeouts"
-  val dockerContainerFactory = s"${docker}.container-factory"
+  val dockerContainerFactory = s"$docker.container-factory"
   val runc = "whisk.runc"
   val runcTimeouts = s"$runc.timeouts"
   val containerFactory = "whisk.container-factory"
diff --git 
a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala 
b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
index 8ec1f5a55d..47eadc64b8 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
@@ -22,7 +22,6 @@ import akka.actor.ActorSystem
 import scala.concurrent.duration.DurationInt
 import scala.concurrent.duration.FiniteDuration
 import whisk.common.Logging
-import whisk.core.WhiskConfig
 import whisk.spi.Spi
 
 /**
@@ -30,11 +29,10 @@ import whisk.spi.Spi
  */
 trait MessagingProvider extends Spi {
   def getConsumer(
-    config: WhiskConfig,
     groupId: String,
     topic: String,
     maxPeek: Int = Int.MaxValue,
     maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging, 
actorSystem: ActorSystem): MessageConsumer
-  def getProducer(config: WhiskConfig)(implicit logging: Logging, actorSystem: 
ActorSystem): MessageProducer
-  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: 
String)(implicit logging: Logging): Boolean
+  def getProducer()(implicit logging: Logging, actorSystem: ActorSystem): 
MessageProducer
+  def ensureTopic(topic: String, topicConfig: String)(implicit logging: 
Logging): Boolean
 }
diff --git 
a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala 
b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
index c632c8303c..775a3f2f39 100644
--- 
a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
+++ 
b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
@@ -29,7 +29,6 @@ import akka.actor.ActorSystem
 import akka.actor.Props
 import spray.json._
 import whisk.common.Logging
-import whisk.core.WhiskConfig
 import whisk.core.connector.Message
 import whisk.core.connector.MessageFeed
 import whisk.core.connector.MessagingProvider
@@ -51,8 +50,7 @@ object CacheInvalidationMessage extends DefaultJsonProtocol {
   implicit val serdes = jsonFormat(CacheInvalidationMessage.apply _, "key", 
"instanceId")
 }
 
-class RemoteCacheInvalidation(config: WhiskConfig, component: String, 
instance: InstanceId)(implicit logging: Logging,
-                                                                               
             as: ActorSystem) {
+class RemoteCacheInvalidation(component: String, instance: 
InstanceId)(implicit logging: Logging, as: ActorSystem) {
 
   implicit private val ec = as.dispatcher
 
@@ -61,8 +59,8 @@ class RemoteCacheInvalidation(config: WhiskConfig, component: 
String, instance:
 
   private val msgProvider = SpiLoader.get[MessagingProvider]
   private val cacheInvalidationConsumer =
-    msgProvider.getConsumer(config, s"$topic$instanceId", topic, maxPeek = 128)
-  private val cacheInvalidationProducer = msgProvider.getProducer(config)
+    msgProvider.getConsumer(s"$topic$instanceId", topic, maxPeek = 128)
+  private val cacheInvalidationProducer = msgProvider.getProducer()
 
   def notifyOtherInstancesAboutInvalidation(key: CacheKey): Future[Unit] = {
     cacheInvalidationProducer.send(topic, CacheInvalidationMessage(key, 
instanceId)).map(_ => Unit)
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/Controller.scala 
b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index c1c739075b..fe582dada3 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -107,7 +107,7 @@ class Controller(val instance: InstanceId,
   private implicit val entityStore = WhiskEntityStore.datastore()
   private implicit val activationStore = WhiskActivationStore.datastore()
   private implicit val cacheChangeNotification = Some(new 
CacheChangeNotification {
-    val remoteCacheInvalidaton = new RemoteCacheInvalidation(whiskConfig, 
"controller", instance)
+    val remoteCacheInvalidaton = new RemoteCacheInvalidation("controller", 
instance)
     override def apply(k: CacheKey) = {
       remoteCacheInvalidaton.invalidateWhiskActionMetaData(k)
       remoteCacheInvalidaton.notifyOtherInstancesAboutInvalidation(k)
@@ -116,7 +116,7 @@ class Controller(val instance: InstanceId,
 
   // initialize backend services
   private implicit val loadBalancer =
-    SpiLoader.get[LoadBalancerProvider].loadBalancer(whiskConfig, instance)
+    SpiLoader.get[LoadBalancerProvider].loadBalancer(instance)
   logging.info(this, s"loadbalancer initialized: 
${loadBalancer.getClass.getSimpleName}")(TransactionId.controller)
 
   private implicit val entitlementProvider =
@@ -168,7 +168,6 @@ object Controller {
     Map(WhiskConfig.controllerInstances -> null) ++
       ExecManifest.requiredProperties ++
       RestApiCommons.requiredProperties ++
-      SpiLoader.get[LoadBalancerProvider].requiredProperties ++
       EntitlementProvider.requiredProperties
 
   private def info(config: WhiskConfig, runtimes: Runtimes, apis: 
List[String]) =
@@ -216,13 +215,13 @@ object Controller {
     }
 
     val msgProvider = SpiLoader.get[MessagingProvider]
-    if (!msgProvider.ensureTopic(config, topic = "completed" + instance, 
topicConfig = "completed")) {
+    if (!msgProvider.ensureTopic(topic = "completed" + instance, topicConfig = 
"completed")) {
       abort(s"failure during msgProvider.ensureTopic for topic 
completed$instance")
     }
-    if (!msgProvider.ensureTopic(config, topic = "health", topicConfig = 
"health")) {
+    if (!msgProvider.ensureTopic(topic = "health", topicConfig = "health")) {
       abort(s"failure during msgProvider.ensureTopic for topic health")
     }
-    if (!msgProvider.ensureTopic(config, topic = "cacheInvalidation", 
topicConfig = "cache-invalidation")) {
+    if (!msgProvider.ensureTopic(topic = "cacheInvalidation", topicConfig = 
"cache-invalidation")) {
       abort(s"failure during msgProvider.ensureTopic for topic 
cacheInvalidation")
     }
 
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala 
b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
index 52ffd73d4e..69b21e006d 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
@@ -21,7 +21,6 @@ import scala.concurrent.Future
 import akka.actor.ActorSystem
 import akka.stream.ActorMaterializer
 import whisk.common.{Logging, TransactionId}
-import whisk.core.WhiskConfig
 import whisk.core.connector._
 import whisk.core.entity._
 import whisk.spi.Spi
@@ -80,11 +79,10 @@ trait LoadBalancer {
  * An Spi for providing load balancer implementations.
  */
 trait LoadBalancerProvider extends Spi {
-  def requiredProperties: Map[String, String]
 
-  def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(implicit 
actorSystem: ActorSystem,
-                                                                   logging: 
Logging,
-                                                                   
materializer: ActorMaterializer): LoadBalancer
+  def loadBalancer(instance: InstanceId)(implicit actorSystem: ActorSystem,
+                                         logging: Logging,
+                                         materializer: ActorMaterializer): 
LoadBalancer
 }
 
 /** Exception thrown by the loadbalancer */
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 7e817e7411..8169b22cf4 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -32,10 +32,9 @@ import org.apache.kafka.clients.producer.RecordMetadata
 import pureconfig._
 import whisk.common.LoggingMarkers._
 import whisk.common._
-import whisk.core.WhiskConfig._
 import whisk.core.connector._
 import whisk.core.entity._
-import whisk.core.{ConfigKeys, WhiskConfig}
+import whisk.core.ConfigKeys
 import whisk.spi.SpiLoader
 
 import scala.annotation.tailrec
@@ -50,10 +49,9 @@ import scala.util.{Failure, Success}
  * Horizontal sharding means, that each invoker's capacity is evenly divided 
between the loadbalancers. If an invoker
  * has at most 16 slots available, those will be divided to 8 slots for each 
loadbalancer (if there are 2).
  */
-class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: 
InstanceId)(
-  implicit val actorSystem: ActorSystem,
-  logging: Logging,
-  materializer: ActorMaterializer)
+class ShardingContainerPoolBalancer(controllerInstance: InstanceId)(implicit 
val actorSystem: ActorSystem,
+                                                                    logging: 
Logging,
+                                                                    
materializer: ActorMaterializer)
     extends LoadBalancer {
 
   private implicit val executionContext: ExecutionContext = 
actorSystem.dispatcher
@@ -186,7 +184,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Ins
   }
 
   private val messagingProvider = SpiLoader.get[MessagingProvider]
-  private val messageProducer = messagingProvider.getProducer(config)
+  private val messageProducer = messagingProvider.getProducer()
 
   /** 3. Send the activation to the invoker */
   private def sendActivationToInvoker(producer: MessageProducer,
@@ -222,7 +220,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Ins
   private val maxActiveAcksPerPoll = 128
   private val activeAckPollDuration = 1.second
   private val activeAckConsumer =
-    messagingProvider.getConsumer(config, activeAckTopic, activeAckTopic, 
maxPeek = maxActiveAcksPerPoll)
+    messagingProvider.getConsumer(activeAckTopic, activeAckTopic, maxPeek = 
maxActiveAcksPerPoll)
 
   private val activationFeed = actorSystem.actorOf(Props {
     new MessageFeed(
@@ -295,19 +293,17 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Ins
       InvokerPool.props(
         (f, i) => f.actorOf(InvokerActor.props(i, controllerInstance)),
         (m, i) => sendActivationToInvoker(messageProducer, m, i),
-        messagingProvider.getConsumer(config, 
s"health${controllerInstance.toInt}", "health", maxPeek = 128),
+        messagingProvider.getConsumer(s"health${controllerInstance.toInt}", 
"health", maxPeek = 128),
         Some(monitor)))
   }
 }
 
 object ShardingContainerPoolBalancer extends LoadBalancerProvider {
 
-  override def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(
-    implicit actorSystem: ActorSystem,
-    logging: Logging,
-    materializer: ActorMaterializer): LoadBalancer = new 
ShardingContainerPoolBalancer(whiskConfig, instance)
-
-  def requiredProperties: Map[String, String] = kafkaHosts
+  override def loadBalancer(instance: InstanceId)(implicit actorSystem: 
ActorSystem,
+                                                  logging: Logging,
+                                                  materializer: 
ActorMaterializer): LoadBalancer =
+    new ShardingContainerPoolBalancer(instance)
 
   /** Generates a hash based on the string representation of namespace and 
action */
   def generateHash(namespace: EntityName, action: FullyQualifiedEntityName): 
Int = {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 4a69d3cf99..fee0eadc1e 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -32,7 +32,7 @@ import akka.actor.CoordinatedShutdown
 import akka.stream.ActorMaterializer
 import whisk.common.AkkaLogging
 import whisk.common.Scheduler
-import whisk.core.WhiskConfig
+import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.core.WhiskConfig._
 import whisk.core.connector.MessagingProvider
 import whisk.core.connector.PingMessage
@@ -43,6 +43,9 @@ import whisk.http.{BasicHttpService, BasicRasService}
 import whisk.spi.SpiLoader
 import whisk.utils.ExecutionContextFactory
 import whisk.common.TransactionId
+import pureconfig._
+
+case class ZookeeperConfig(hosts: String)
 
 case class CmdLineArgs(name: Option[String] = None, id: Option[Int] = None)
 
@@ -54,8 +57,6 @@ object Invoker {
   def requiredProperties =
     Map(servicePort -> 8080.toString(), dockerRegistry -> null, 
dockerImagePrefix -> null) ++
       ExecManifest.requiredProperties ++
-      kafkaHosts ++
-      zookeeperHosts ++
       wskApiHost ++ Map(dockerImageTag -> "latest") ++
       Map(invokerName -> "")
 
@@ -117,16 +118,16 @@ object Invoker {
         id
       }
       .getOrElse {
-        if (config.zookeeperHosts.startsWith(":") || 
config.zookeeperHosts.endsWith(":")) {
-          abort(s"Must provide valid zookeeper host and port to use dynamicId 
assignment (${config.zookeeperHosts})")
+        val zookeeperConfig = 
loadConfig[ZookeeperConfig](ConfigKeys.zookeeper).right.getOrElse {
+          abort(s"Must provide valid zookeeper host and port to use dynamicId 
assignment")
         }
         if (invokerName.isEmpty || invokerName.get.trim.isEmpty) {
           abort("Invoker name can't be empty to use dynamicId assignment.")
         }
 
-        logger.info(this, s"invokerReg: creating zkClient to 
${config.zookeeperHosts}")
+        logger.info(this, s"invokerReg: creating zkClient to 
${zookeeperConfig.hosts}")
         val retryPolicy = new RetryUntilElapsed(5000, 500) // retry at 500ms 
intervals until 5 seconds have elapsed
-        val zkClient = 
CuratorFrameworkFactory.newClient(config.zookeeperHosts, retryPolicy)
+        val zkClient = 
CuratorFrameworkFactory.newClient(zookeeperConfig.hosts, retryPolicy)
         zkClient.start()
         zkClient.blockUntilConnected()
         logger.info(this, "invokerReg: connected to zookeeper")
@@ -168,10 +169,10 @@ object Invoker {
 
     val invokerInstance = InstanceId(assignedInvokerId, invokerName)
     val msgProvider = SpiLoader.get[MessagingProvider]
-    if (!msgProvider.ensureTopic(config, topic = "invoker" + 
assignedInvokerId, topicConfig = "invoker")) {
+    if (!msgProvider.ensureTopic(topic = "invoker" + assignedInvokerId, 
topicConfig = "invoker")) {
       abort(s"failure during msgProvider.ensureTopic for topic 
invoker$assignedInvokerId")
     }
-    val producer = msgProvider.getProducer(config)
+    val producer = msgProvider.getProducer()
     val invoker = try {
       new InvokerReactive(config, invokerInstance, producer)
     } catch {
diff --git 
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 20cbbd46cd..0c90cc22fa 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -97,12 +97,8 @@ class InvokerReactive(
   private val topic = s"invoker${instance.toInt}"
   private val maximumContainers = poolConfig.maxActiveContainers
   private val msgProvider = SpiLoader.get[MessagingProvider]
-  private val consumer = msgProvider.getConsumer(
-    config,
-    topic,
-    topic,
-    maximumContainers,
-    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+  private val consumer =
+    msgProvider.getConsumer(topic, topic, maximumContainers, maxPollInterval = 
TimeLimit.MAX_DURATION + 1.minute)
 
   private val activationFeed = actorSystem.actorOf(Props {
     new MessageFeed("activation", logging, consumer, maximumContainers, 
500.milliseconds, processActivationMessage)
diff --git a/tests/src/test/resources/application.conf.j2 
b/tests/src/test/resources/application.conf.j2
index 62fa019d1e..eb51668ce0 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -21,6 +21,7 @@ whisk {
             }
         }
         common {
+          bootstrap-hosts: "{{ kafka_connect_string }}"
           security-protocol: {{ kafka.protocol }}
           ssl-truststore-location: {{ openwhisk_home 
}}/ansible/roles/kafka/files/{{ kafka.ssl.keystore.name }}
           ssl-truststore-password: {{ kafka.ssl.keystore.password }}
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala 
b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 3f91b59fd2..231df9278d 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -28,7 +28,7 @@ import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 import whisk.common.TransactionId
 import whisk.connector.kafka.{KafkaConsumerConnector, KafkaMessagingProvider, 
KafkaProducerConnector}
-import whisk.core.WhiskConfig
+import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.core.connector.Message
 import whisk.utils.{retry, ExecutionContextFactory}
 
@@ -36,6 +36,7 @@ import scala.concurrent.duration.{DurationInt, FiniteDuration}
 import scala.concurrent.{Await, ExecutionContext}
 import scala.language.postfixOps
 import scala.util.Try
+import pureconfig._
 
 @RunWith(classOf[JUnitRunner])
 class KafkaConnectorTests
@@ -48,9 +49,6 @@ class KafkaConnectorTests
   implicit val transid: TransactionId = TransactionId.testing
   implicit val ec: ExecutionContext = 
ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
 
-  val config = new WhiskConfig(WhiskConfig.kafkaHosts)
-  assert(config.isValid)
-
   val groupid = "kafkatest"
   val topic = "KafkaConnectorTestTopic"
   val maxPollInterval = 10.seconds
@@ -58,17 +56,17 @@ class KafkaConnectorTests
   // Need to overwrite replication factor for tests that shut down and start
   // Kafka instances intentionally. These tests will fail if there is more than
   // one Kafka host but a replication factor of 1.
-  val kafkaHosts: Array[String] = config.kafkaHosts.split(",")
+  val kafkaHosts: Array[String] = 
loadConfigOrThrow[String](ConfigKeys.kafkaCommon + 
".bootstrap-hosts").split(',')
   val replicationFactor: Int = kafkaHosts.length / 2 + 1
   System.setProperty("whisk.kafka.replication-factor", 
replicationFactor.toString)
   println(s"Create test topic '$topic' with 
replicationFactor=$replicationFactor")
-  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation 
of topic $topic failed")
+  assert(KafkaMessagingProvider.ensureTopic(topic, topic), s"Creation of topic 
$topic failed")
 
   println(s"Create test topic '$topic' with 
replicationFactor=$replicationFactor")
-  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation 
of topic $topic failed")
+  assert(KafkaMessagingProvider.ensureTopic(topic, topic), s"Creation of topic 
$topic failed")
 
-  val producer = new KafkaProducerConnector(config.kafkaHosts)
-  val consumer = new KafkaConsumerConnector(config.kafkaHosts, groupid, topic)
+  val producer = new KafkaProducerConnector()
+  val consumer = new KafkaConsumerConnector(groupid, topic)
 
   override def afterAll(): Unit = {
     producer.close()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to