This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new aa85fc9  controller creates health and cacheInvalidation topics (#2991)
aa85fc9 is described below

commit aa85fc9b9bf32eb78309df5c9390a195375cfb7d
Author: David Grove <dgrove-...@users.noreply.github.com>
AuthorDate: Fri Dec 8 13:15:26 2017 -0500

    controller creates health and cacheInvalidation topics (#2991)
    
    Move creation of the health and cacheInvalidation topics
    from ansible to controller startup using ensureTopic
    from MessagingProvider SPI.
    
    Also convert kafka topic configuration to pureconfig.
---
 ansible/environments/local/group_vars/all          |  2 +-
 ansible/group_vars/all                             | 17 ------------
 ansible/roles/controller/tasks/deploy.yml          | 14 +++++++---
 ansible/roles/invoker/tasks/deploy.yml             |  8 +++---
 ansible/roles/kafka/tasks/deploy.yml               | 12 ---------
 common/scala/src/main/resources/application.conf   | 31 +++++++++++++++++++++-
 .../connector/kafka/KafkaMessagingProvider.scala   | 24 ++++++++++++-----
 .../src/main/scala/whisk/core/WhiskConfig.scala    | 16 -----------
 .../whisk/core/connector/MessagingProvider.scala   |  3 +--
 .../scala/whisk/core/controller/Controller.scala   | 16 +++++------
 .../core/loadBalancer/LoadBalancerService.scala    | 13 ++++-----
 .../main/scala/whisk/core/invoker/Invoker.scala    | 16 +----------
 12 files changed, 79 insertions(+), 93 deletions(-)

diff --git a/ansible/environments/local/group_vars/all 
b/ansible/environments/local/group_vars/all
index c3b3c12..0056e96 100755
--- a/ansible/environments/local/group_vars/all
+++ b/ansible/environments/local/group_vars/all
@@ -24,7 +24,7 @@ invoker_arguments: "{{ controller_arguments }}"
 
 invoker_allow_multiple_instances: true
 
-# Set kafka topic retention
+# Set kafka configuration
 kafka_heap: '512m'
 kafka_topics_completed_retentionBytes: 104857600
 kafka_topics_completed_retentionMS: 300000
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 7467aee..ad77a88 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -135,23 +135,6 @@ kafka:
     port: 8093
   heap: "{{ kafka_heap | default('1g') }}"
   replicationFactor: "{{ kafka_replicationFactor | 
default((groups['kafkas']|length)|int) }}"
-  topics:
-    completed:
-      segmentBytes: 536870912
-      retentionBytes: "{{ kafka_topics_completed_retentionBytes | 
default(1073741824) }}"
-      retentionMS: "{{ kafka_topics_completed_retentionMS | default(3600000) 
}}"
-    health:
-      segmentBytes: 536870912
-      retentionBytes: "{{ kafka_topics_health_retentionBytes | 
default(1073741824) }}"
-      retentionMS: "{{ kafka_topics_health_retentionMS | default(3600000) }}"
-    invoker:
-      segmentBytes: 536870912
-      retentionBytes: "{{ kafka_topics_invoker_retentionBytes | 
default(1073741824) }}"
-      retentionMS: "{{ kafka_topics_invoker_retentionMS | default(172800000) 
}}"
-    cacheInvalidation:
-      segmentBytes: 536870912
-      retentionBytes: "{{ kafka_topics_cacheInvalidation_retentionBytes | 
default(1073741824) }}"
-      retentionMS: "{{ kafka_topics_cacheInvalidation_retentionMS | 
default(300000) }}"
 
 kafka_connect_string: "{% set ret = [] %}\
                        {% for host in groups['kafkas'] %}\
diff --git a/ansible/roles/controller/tasks/deploy.yml 
b/ansible/roles/controller/tasks/deploy.yml
index 6caa545..82db5d3 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -52,10 +52,16 @@
       "WHISK_VERSION_BUILDNO": "{{ docker.image.tag }}"
 
       "KAFKA_HOSTS": "{{ kafka_connect_string }}"
-      "KAFKA_TOPICS_COMPLETED_RETENTION_BYTES": "{{ 
kafka.topics.completed.retentionBytes }}"
-      "KAFKA_TOPICS_COMPLETED_RETENTION_MS": "{{ 
kafka.topics.completed.retentionMS }}"
-      "KAFKA_TOPICS_COMPLETED_SEGMENT_BYTES": "{{ 
kafka.topics.completed.segmentBytes }}"
-      "KAFKA_REPLICATIONFACTOR": "{{ kafka.replicationFactor }}"
+      "CONFIG_whisk_kafka_replicationFactor": "{{ kafka.replicationFactor | 
default() }}"
+      "CONFIG_whisk_kafka_topics_cacheInvalidation_retentionBytes": "{{ 
kafka_topics_cacheInvalidation_retentionBytes | default() }}"
+      "CONFIG_whisk_kafka_topics_cacheInvalidation_retentionMs": "{{ 
kafka_topics_cacheInvalidation_retentionMS | default() }}"
+      "CONFIG_whisk_kafka_topics_cacheInvalidation_segmentBytes": "{{ 
kafka_topics_cacheInvalidation_segmentBytes | default() }}"
+      "CONFIG_whisk_kafka_topics_completed_retentionBytes": "{{ 
kafka_topics_completed_retentionBytes | default() }}"
+      "CONFIG_whisk_kafka_topics_completed_retentionMs": "{{ 
kafka_topics_completed_retentionMS | default() }}"
+      "CONFIG_whisk_kafka_topics_completed_segmentBytes": "{{ 
kafka_topics_completed_segmentBytes | default() }}"
+      "CONFIG_whisk_kafka_topics_health_retentionBytes": "{{ 
kafka_topics_health_retentionBytes | default() }}"
+      "CONFIG_whisk_kafka_topics_health_retentionMs": "{{ 
kafka_topics_health_retentionMS | default() }}"
+      "CONFIG_whisk_kafka_topics_health_segmentBytes": "{{ 
kafka_topics_health_segmentBytes | default() }}"
 
       "DB_PROTOCOL": "{{ db_protocol }}"
       "DB_PROVIDER": "{{ db_provider }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml 
b/ansible/roles/invoker/tasks/deploy.yml
index 112b37b..91d381f 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -118,10 +118,10 @@
         -e COMPONENT_NAME='invoker{{ 
groups['invokers'].index(inventory_hostname) }}'
         -e PORT='8080'
         -e KAFKA_HOSTS='{{ kafka_connect_string }}'
-        -e KAFKA_TOPICS_INVOKER_RETENTION_BYTES='{{ 
kafka.topics.invoker.retentionBytes }}'
-        -e KAFKA_TOPICS_INVOKER_RETENTION_MS='{{ 
kafka.topics.invoker.retentionMS }}'
-        -e KAFKA_TOPICS_INVOKER_SEGMENT_BYTES='{{ 
kafka.topics.invoker.segmentBytes }}'
-        -e KAFKA_REPLICATIONFACTOR='{{ kafka.replicationFactor }}'
+        -e CONFIG_whisk_kafka_replicationFactor='{{ kafka.replicationFactor | 
default() }}'
+        -e CONFIG_whisk_kafka_topics_invoker_retentionBytes='{{ 
kafka_topics_invoker_retentionBytes | default() }}'
+        -e CONFIG_whisk_kafka_topics_invoker_retentionMs='{{ 
kafka_topics_invoker_retentionMS | default() }}'
+        -e CONFIG_whisk_kakfa_topics_invoker_segmentBytes='{{ 
kafka_topics_invoker_segmentBytes | default() }}'
         -e ZOOKEEPER_HOSTS='{{ zookeeper_connect_string }}'
         -e DB_PROTOCOL='{{ db_protocol }}'
         -e DB_PROVIDER='{{ db_provider }}'
diff --git a/ansible/roles/kafka/tasks/deploy.yml 
b/ansible/roles/kafka/tasks/deploy.yml
index 3d5845b..7c6feda 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -32,15 +32,3 @@
   until: (('[Kafka Server ' + 
(groups['kafkas'].index(inventory_hostname)|string) + '], started') in 
result.stdout)
   retries: 10
   delay: 5
-
-- name: create the health and the cacheInvalidation topic
-  shell: "docker exec kafka0 bash -c 'unset JMX_PORT; kafka-topics.sh --create 
--topic {{ item.name }} --replication-factor {{ kafka.replicationFactor }} 
--partitions 1 --zookeeper $KAFKA_ZOOKEEPER_CONNECT --config retention.bytes={{ 
item.settings.retentionBytes }} --config retention.ms={{ 
item.settings.retentionMS }} --config segment.bytes={{ 
item.settings.segmentBytes }}'"
-  register: command_result
-  failed_when: "not ('Created topic' in command_result.stdout or 'already 
exists' in command_result.stdout)"
-  changed_when: "'Created topic' in command_result.stdout"
-  with_items:
-  - name: health
-    settings: "{{ kafka.topics.health }}"
-  - name: cacheInvalidation
-    settings: "{{ kafka.topics.cacheInvalidation }}"
-  when: groups['kafkas'].index(inventory_hostname ) == 0
diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index 41b41ce..dc2d4b1 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -47,4 +47,33 @@ kamon {
     modules {
         kamon-statsd.auto-start = yes
     }
-}
\ No newline at end of file
+}
+
+whisk {
+    # kafka related configuration
+    kafka {
+        replication-factor = 1
+        topics {
+            cache-invalidation {
+                segment-bytes   =  536870912
+                retention-bytes = 1073741824
+                retention-ms    = 300000
+            }
+            completed {
+                segment-bytes   =  536870912
+                retention-bytes = 1073741824
+                retention-ms    = 3600000
+            }
+            health {
+                segment-bytes   =  536870912
+                retention-bytes = 1073741824
+                retention-ms    = 3600000
+            }
+            invoker {
+                segment-bytes   =  536870912
+                retention-bytes = 1073741824
+                retention-ms    = 172800000
+            }
+        }
+    }
+}
diff --git 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index 599b5bf..1cc1ea2 100644
--- 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -35,6 +35,19 @@ import whisk.core.connector.MessageConsumer
 import whisk.core.connector.MessageProducer
 import whisk.core.connector.MessagingProvider
 
+import pureconfig._
+
+case class KafkaConfig(replicationFactor: Short)
+
+case class TopicConfig(segmentBytes: Long, retentionBytes: Long, retentionMs: 
Long) {
+  def toMap: Map[String, String] = {
+    Map(
+      "retention.bytes" -> retentionBytes.toString,
+      "retention.ms" -> retentionMs.toString,
+      "segment.bytes" -> segmentBytes.toString)
+  }
+}
+
 /**
  * A Kafka based implementation of MessagingProvider
  */
@@ -46,15 +59,14 @@ object KafkaMessagingProvider extends MessagingProvider {
   def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: 
Logging): MessageProducer =
     new KafkaProducerConnector(config.kafkaHosts, ec)
 
-  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: Map[String, 
String])(
-    implicit logging: Logging): Boolean = {
+  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: 
String)(implicit logging: Logging): Boolean = {
+    val kc = loadConfigOrThrow[KafkaConfig]("whisk.kafka")
+    val tc = loadConfigOrThrow[TopicConfig](s"whisk.kafka.topics.$topicConfig")
     val props = new Properties
     props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
     val client = AdminClient.create(props)
-    val numPartitions = topicConfig.getOrElse("numPartitions", "1").toInt
-    val replicationFactor = topicConfig.getOrElse("replicationFactor", 
"1").toShort
-    val nt = new NewTopic(topic, numPartitions, replicationFactor)
-      .configs((topicConfig - ("numPartitions", "replicationFactor")).asJava)
+    val numPartitions = 1
+    val nt = new NewTopic(topic, numPartitions, 
kc.replicationFactor).configs(tc.toMap.asJava)
     val results = client.createTopics(List(nt).asJava)
     try {
       results.values().get(topic).get()
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 36c17af..96cf1bc 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -96,14 +96,6 @@ class WhiskConfig(requiredProperties: Map[String, String],
   val dbActivations = this(WhiskConfig.dbActivations)
   val mainDockerEndpoint = this(WhiskConfig.mainDockerEndpoint)
 
-  val kafkaTopicsInvokerRetentionBytes = 
this(WhiskConfig.kafkaTopicsInvokerRetentionBytes)
-  val kafkaTopicsInvokerRetentionMS = 
this(WhiskConfig.kafkaTopicsInvokerRetentionMS)
-  val kafkaTopicsInvokerSegmentBytes = 
this(WhiskConfig.kafkaTopicsInvokerSegmentBytes)
-  val kafkaTopicsCompletedRetentionBytes = 
this(WhiskConfig.kafkaTopicsCompletedRetentionBytes)
-  val kafkaTopicsCompletedRetentionMS = 
this(WhiskConfig.kafkaTopicsCompletedRetentionMS)
-  val kafkaTopicsCompletedSegmentBytes = 
this(WhiskConfig.kafkaTopicsCompletedSegmentBytes)
-  val kafkaReplicationFactor = this(WhiskConfig.kafkaReplicationFactor)
-
   val runtimesManifest = this(WhiskConfig.runtimesManifest)
   val actionInvokePerMinuteLimit = this(WhiskConfig.actionInvokePerMinuteLimit)
   val actionInvokeConcurrentLimit = 
this(WhiskConfig.actionInvokeConcurrentLimit)
@@ -242,14 +234,6 @@ object WhiskConfig {
 
   val runtimesManifest = "runtimes.manifest"
 
-  val kafkaTopicsInvokerRetentionBytes = "kafka.topics.invoker.retentionBytes"
-  val kafkaTopicsInvokerRetentionMS = "kafka.topics.invoker.retentionMS"
-  val kafkaTopicsInvokerSegmentBytes = "kafka.topics.invoker.segmentBytes"
-  val kafkaTopicsCompletedRetentionBytes = 
"kafka.topics.completed.retentionBytes"
-  val kafkaTopicsCompletedRetentionMS = "kafka.topics.completed.retentionMS"
-  val kafkaTopicsCompletedSegmentBytes = "kafka.topics.completed.segmentBytes"
-  val kafkaReplicationFactor = "kafka.replicationFactor"
-
   val actionSequenceMaxLimit = "limits.actions.sequence.maxLength"
   val actionInvokePerMinuteLimit = "limits.actions.invokes.perMinute"
   val actionInvokeConcurrentLimit = "limits.actions.invokes.concurrent"
diff --git 
a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala 
b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
index 7772723..1737624 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
@@ -34,6 +34,5 @@ trait MessagingProvider extends Spi {
                   maxPeek: Int = Int.MaxValue,
                   maxPollInterval: FiniteDuration = 5.minutes)(implicit 
logging: Logging): MessageConsumer
   def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: 
Logging): MessageProducer
-  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: Map[String, 
String])(
-    implicit logging: Logging): Boolean
+  def ensureTopic(config: WhiskConfig, topic: String, topicConfig: 
String)(implicit logging: Logging): Boolean
 }
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/Controller.scala 
b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 80a6a90..08b8f1d 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -212,17 +212,15 @@ object Controller {
     }
 
     val msgProvider = SpiLoader.get[MessagingProvider]
-    if (!msgProvider.ensureTopic(
-          config,
-          "completed" + instance,
-          Map(
-            "numPartitions" -> "1",
-            "replicationFactor" -> config.kafkaReplicationFactor,
-            "retention.bytes" -> config.kafkaTopicsCompletedRetentionBytes,
-            "retention.ms" -> config.kafkaTopicsCompletedRetentionMS,
-            "segment.bytes" -> config.kafkaTopicsCompletedSegmentBytes))) {
+    if (!msgProvider.ensureTopic(config, topic = "completed" + instance, 
topicConfig = "completed")) {
       abort(s"failure during msgProvider.ensureTopic for topic 
completed$instance")
     }
+    if (!msgProvider.ensureTopic(config, topic = "health", topicConfig = 
"health")) {
+      abort(s"failure during msgProvider.ensureTopic for topic health")
+    }
+    if (!msgProvider.ensureTopic(config, topic = "cacheInvalidation", 
topicConfig = "cache-invalidation")) {
+      abort(s"failure during msgProvider.ensureTopic for topic 
cacheInvalidation")
+    }
 
     ExecManifest.initialize(config) match {
       case Success(_) =>
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index 5be950d..e3776ae 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -45,7 +45,13 @@ import whisk.core.connector.MessageProducer
 import whisk.core.connector.MessagingProvider
 import whisk.core.database.NoDocumentException
 import whisk.core.entity._
-import whisk.core.entity.size._
+import whisk.core.entity.{ActivationId, WhiskActivation}
+import whisk.core.entity.EntityName
+import whisk.core.entity.ExecutableWhiskActionMetaData
+import whisk.core.entity.Identity
+import whisk.core.entity.InstanceId
+import whisk.core.entity.UUID
+import whisk.core.entity.WhiskAction
 import whisk.core.entity.types.EntityStore
 import whisk.spi.SpiLoader
 import pureconfig._
@@ -345,11 +351,6 @@ class LoadBalancerService(config: WhiskConfig, instance: 
InstanceId, entityStore
 object LoadBalancerService {
   def requiredProperties =
     kafkaHosts ++
-      Map(
-        kafkaTopicsCompletedRetentionBytes -> 1024.MB.toBytes.toString,
-        kafkaTopicsCompletedRetentionMS -> 1.hour.toMillis.toString,
-        kafkaTopicsCompletedSegmentBytes -> 512.MB.toBytes.toString,
-        kafkaReplicationFactor -> "1") ++
       Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
 
   /** Memoizes the result of `f` for later use. */
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 1a71e29..77e3da8 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -43,7 +43,6 @@ import whisk.core.entity.ExecManifest
 import whisk.core.entity.InstanceId
 import whisk.core.entity.WhiskActivationStore
 import whisk.core.entity.WhiskEntityStore
-import whisk.core.entity.size._
 import whisk.http.BasicHttpService
 import whisk.spi.SpiLoader
 import whisk.utils.ExecutionContextFactory
@@ -62,11 +61,6 @@ object Invoker {
       WhiskEntityStore.requiredProperties ++
       WhiskActivationStore.requiredProperties ++
       kafkaHosts ++
-      Map(
-        kafkaTopicsInvokerRetentionBytes -> 1024.MB.toBytes.toString,
-        kafkaTopicsInvokerRetentionMS -> 48.hour.toMillis.toString,
-        kafkaTopicsInvokerSegmentBytes -> 512.MB.toBytes.toString,
-        kafkaReplicationFactor -> "1") ++
       zookeeperHosts ++
       wskApiHost ++ Map(
       dockerImageTag -> "latest",
@@ -188,15 +182,7 @@ object Invoker {
 
     val invokerInstance = InstanceId(assignedInvokerId)
     val msgProvider = SpiLoader.get[MessagingProvider]
-    if (!msgProvider.ensureTopic(
-          config,
-          "invoker" + assignedInvokerId,
-          Map(
-            "numPartitions" -> "1",
-            "replicationFactor" -> config.kafkaReplicationFactor,
-            "retention.bytes" -> config.kafkaTopicsInvokerRetentionBytes,
-            "retention.ms" -> config.kafkaTopicsInvokerRetentionMS,
-            "segment.bytes" -> config.kafkaTopicsInvokerSegmentBytes))) {
+    if (!msgProvider.ensureTopic(config, topic = "invoker" + 
assignedInvokerId, topicConfig = "invoker")) {
       abort(s"failure during msgProvider.ensureTopic for topic 
invoker$assignedInvokerId")
     }
     val producer = msgProvider.getProducer(config, ec)

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Reply via email to