This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new aa85fc9 controller creates health and cacheInvalidation topics (#2991) aa85fc9 is described below commit aa85fc9b9bf32eb78309df5c9390a195375cfb7d Author: David Grove <dgrove-...@users.noreply.github.com> AuthorDate: Fri Dec 8 13:15:26 2017 -0500 controller creates health and cacheInvalidation topics (#2991) Move creation of the health and cacheInvalidation topics from ansible to controller startup using ensureTopic from MessagingProvider SPI. Also convert kafka topic configuration to pureconfig. --- ansible/environments/local/group_vars/all | 2 +- ansible/group_vars/all | 17 ------------ ansible/roles/controller/tasks/deploy.yml | 14 +++++++--- ansible/roles/invoker/tasks/deploy.yml | 8 +++--- ansible/roles/kafka/tasks/deploy.yml | 12 --------- common/scala/src/main/resources/application.conf | 31 +++++++++++++++++++++- .../connector/kafka/KafkaMessagingProvider.scala | 24 ++++++++++++----- .../src/main/scala/whisk/core/WhiskConfig.scala | 16 ----------- .../whisk/core/connector/MessagingProvider.scala | 3 +-- .../scala/whisk/core/controller/Controller.scala | 16 +++++------ .../core/loadBalancer/LoadBalancerService.scala | 13 ++++----- .../main/scala/whisk/core/invoker/Invoker.scala | 16 +---------- 12 files changed, 79 insertions(+), 93 deletions(-) diff --git a/ansible/environments/local/group_vars/all b/ansible/environments/local/group_vars/all index c3b3c12..0056e96 100755 --- a/ansible/environments/local/group_vars/all +++ b/ansible/environments/local/group_vars/all @@ -24,7 +24,7 @@ invoker_arguments: "{{ controller_arguments }}" invoker_allow_multiple_instances: true -# Set kafka topic retention +# Set kafka configuration kafka_heap: '512m' kafka_topics_completed_retentionBytes: 104857600 kafka_topics_completed_retentionMS: 300000 diff --git a/ansible/group_vars/all b/ansible/group_vars/all index 7467aee..ad77a88 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -135,23 +135,6 @@ kafka: port: 8093 heap: "{{ kafka_heap | default('1g') }}" replicationFactor: "{{ kafka_replicationFactor | default((groups['kafkas']|length)|int) }}" - topics: - completed: - segmentBytes: 536870912 - retentionBytes: "{{ kafka_topics_completed_retentionBytes | default(1073741824) }}" - retentionMS: "{{ kafka_topics_completed_retentionMS | default(3600000) }}" - health: - segmentBytes: 536870912 - retentionBytes: "{{ kafka_topics_health_retentionBytes | default(1073741824) }}" - retentionMS: "{{ kafka_topics_health_retentionMS | default(3600000) }}" - invoker: - segmentBytes: 536870912 - retentionBytes: "{{ kafka_topics_invoker_retentionBytes | default(1073741824) }}" - retentionMS: "{{ kafka_topics_invoker_retentionMS | default(172800000) }}" - cacheInvalidation: - segmentBytes: 536870912 - retentionBytes: "{{ kafka_topics_cacheInvalidation_retentionBytes | default(1073741824) }}" - retentionMS: "{{ kafka_topics_cacheInvalidation_retentionMS | default(300000) }}" kafka_connect_string: "{% set ret = [] %}\ {% for host in groups['kafkas'] %}\ diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index 6caa545..82db5d3 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -52,10 +52,16 @@ "WHISK_VERSION_BUILDNO": "{{ docker.image.tag }}" "KAFKA_HOSTS": "{{ kafka_connect_string }}" - "KAFKA_TOPICS_COMPLETED_RETENTION_BYTES": "{{ kafka.topics.completed.retentionBytes }}" - "KAFKA_TOPICS_COMPLETED_RETENTION_MS": "{{ kafka.topics.completed.retentionMS }}" - "KAFKA_TOPICS_COMPLETED_SEGMENT_BYTES": "{{ kafka.topics.completed.segmentBytes }}" - "KAFKA_REPLICATIONFACTOR": "{{ kafka.replicationFactor }}" + "CONFIG_whisk_kafka_replicationFactor": "{{ kafka.replicationFactor | default() }}" + "CONFIG_whisk_kafka_topics_cacheInvalidation_retentionBytes": "{{ kafka_topics_cacheInvalidation_retentionBytes | default() }}" + "CONFIG_whisk_kafka_topics_cacheInvalidation_retentionMs": "{{ kafka_topics_cacheInvalidation_retentionMS | default() }}" + "CONFIG_whisk_kafka_topics_cacheInvalidation_segmentBytes": "{{ kafka_topics_cacheInvalidation_segmentBytes | default() }}" + "CONFIG_whisk_kafka_topics_completed_retentionBytes": "{{ kafka_topics_completed_retentionBytes | default() }}" + "CONFIG_whisk_kafka_topics_completed_retentionMs": "{{ kafka_topics_completed_retentionMS | default() }}" + "CONFIG_whisk_kafka_topics_completed_segmentBytes": "{{ kafka_topics_completed_segmentBytes | default() }}" + "CONFIG_whisk_kafka_topics_health_retentionBytes": "{{ kafka_topics_health_retentionBytes | default() }}" + "CONFIG_whisk_kafka_topics_health_retentionMs": "{{ kafka_topics_health_retentionMS | default() }}" + "CONFIG_whisk_kafka_topics_health_segmentBytes": "{{ kafka_topics_health_segmentBytes | default() }}" "DB_PROTOCOL": "{{ db_protocol }}" "DB_PROVIDER": "{{ db_provider }}" diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index 112b37b..91d381f 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -118,10 +118,10 @@ -e COMPONENT_NAME='invoker{{ groups['invokers'].index(inventory_hostname) }}' -e PORT='8080' -e KAFKA_HOSTS='{{ kafka_connect_string }}' - -e KAFKA_TOPICS_INVOKER_RETENTION_BYTES='{{ kafka.topics.invoker.retentionBytes }}' - -e KAFKA_TOPICS_INVOKER_RETENTION_MS='{{ kafka.topics.invoker.retentionMS }}' - -e KAFKA_TOPICS_INVOKER_SEGMENT_BYTES='{{ kafka.topics.invoker.segmentBytes }}' - -e KAFKA_REPLICATIONFACTOR='{{ kafka.replicationFactor }}' + -e CONFIG_whisk_kafka_replicationFactor='{{ kafka.replicationFactor | default() }}' + -e CONFIG_whisk_kafka_topics_invoker_retentionBytes='{{ kafka_topics_invoker_retentionBytes | default() }}' + -e CONFIG_whisk_kafka_topics_invoker_retentionMs='{{ kafka_topics_invoker_retentionMS | default() }}' + -e CONFIG_whisk_kakfa_topics_invoker_segmentBytes='{{ kafka_topics_invoker_segmentBytes | default() }}' -e ZOOKEEPER_HOSTS='{{ zookeeper_connect_string }}' -e DB_PROTOCOL='{{ db_protocol }}' -e DB_PROVIDER='{{ db_provider }}' diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml index 3d5845b..7c6feda 100644 --- a/ansible/roles/kafka/tasks/deploy.yml +++ b/ansible/roles/kafka/tasks/deploy.yml @@ -32,15 +32,3 @@ until: (('[Kafka Server ' + (groups['kafkas'].index(inventory_hostname)|string) + '], started') in result.stdout) retries: 10 delay: 5 - -- name: create the health and the cacheInvalidation topic - shell: "docker exec kafka0 bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item.name }} --replication-factor {{ kafka.replicationFactor }} --partitions 1 --zookeeper $KAFKA_ZOOKEEPER_CONNECT --config retention.bytes={{ item.settings.retentionBytes }} --config retention.ms={{ item.settings.retentionMS }} --config segment.bytes={{ item.settings.segmentBytes }}'" - register: command_result - failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)" - changed_when: "'Created topic' in command_result.stdout" - with_items: - - name: health - settings: "{{ kafka.topics.health }}" - - name: cacheInvalidation - settings: "{{ kafka.topics.cacheInvalidation }}" - when: groups['kafkas'].index(inventory_hostname ) == 0 diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index 41b41ce..dc2d4b1 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -47,4 +47,33 @@ kamon { modules { kamon-statsd.auto-start = yes } -} \ No newline at end of file +} + +whisk { + # kafka related configuration + kafka { + replication-factor = 1 + topics { + cache-invalidation { + segment-bytes = 536870912 + retention-bytes = 1073741824 + retention-ms = 300000 + } + completed { + segment-bytes = 536870912 + retention-bytes = 1073741824 + retention-ms = 3600000 + } + health { + segment-bytes = 536870912 + retention-bytes = 1073741824 + retention-ms = 3600000 + } + invoker { + segment-bytes = 536870912 + retention-bytes = 1073741824 + retention-ms = 172800000 + } + } + } +} diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala index 599b5bf..1cc1ea2 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala @@ -35,6 +35,19 @@ import whisk.core.connector.MessageConsumer import whisk.core.connector.MessageProducer import whisk.core.connector.MessagingProvider +import pureconfig._ + +case class KafkaConfig(replicationFactor: Short) + +case class TopicConfig(segmentBytes: Long, retentionBytes: Long, retentionMs: Long) { + def toMap: Map[String, String] = { + Map( + "retention.bytes" -> retentionBytes.toString, + "retention.ms" -> retentionMs.toString, + "segment.bytes" -> segmentBytes.toString) + } +} + /** * A Kafka based implementation of MessagingProvider */ @@ -46,15 +59,14 @@ object KafkaMessagingProvider extends MessagingProvider { def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer = new KafkaProducerConnector(config.kafkaHosts, ec) - def ensureTopic(config: WhiskConfig, topic: String, topicConfig: Map[String, String])( - implicit logging: Logging): Boolean = { + def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean = { + val kc = loadConfigOrThrow[KafkaConfig]("whisk.kafka") + val tc = loadConfigOrThrow[TopicConfig](s"whisk.kafka.topics.$topicConfig") val props = new Properties props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts) val client = AdminClient.create(props) - val numPartitions = topicConfig.getOrElse("numPartitions", "1").toInt - val replicationFactor = topicConfig.getOrElse("replicationFactor", "1").toShort - val nt = new NewTopic(topic, numPartitions, replicationFactor) - .configs((topicConfig - ("numPartitions", "replicationFactor")).asJava) + val numPartitions = 1 + val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.toMap.asJava) val results = client.createTopics(List(nt).asJava) try { results.values().get(topic).get() diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala index 36c17af..96cf1bc 100644 --- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala @@ -96,14 +96,6 @@ class WhiskConfig(requiredProperties: Map[String, String], val dbActivations = this(WhiskConfig.dbActivations) val mainDockerEndpoint = this(WhiskConfig.mainDockerEndpoint) - val kafkaTopicsInvokerRetentionBytes = this(WhiskConfig.kafkaTopicsInvokerRetentionBytes) - val kafkaTopicsInvokerRetentionMS = this(WhiskConfig.kafkaTopicsInvokerRetentionMS) - val kafkaTopicsInvokerSegmentBytes = this(WhiskConfig.kafkaTopicsInvokerSegmentBytes) - val kafkaTopicsCompletedRetentionBytes = this(WhiskConfig.kafkaTopicsCompletedRetentionBytes) - val kafkaTopicsCompletedRetentionMS = this(WhiskConfig.kafkaTopicsCompletedRetentionMS) - val kafkaTopicsCompletedSegmentBytes = this(WhiskConfig.kafkaTopicsCompletedSegmentBytes) - val kafkaReplicationFactor = this(WhiskConfig.kafkaReplicationFactor) - val runtimesManifest = this(WhiskConfig.runtimesManifest) val actionInvokePerMinuteLimit = this(WhiskConfig.actionInvokePerMinuteLimit) val actionInvokeConcurrentLimit = this(WhiskConfig.actionInvokeConcurrentLimit) @@ -242,14 +234,6 @@ object WhiskConfig { val runtimesManifest = "runtimes.manifest" - val kafkaTopicsInvokerRetentionBytes = "kafka.topics.invoker.retentionBytes" - val kafkaTopicsInvokerRetentionMS = "kafka.topics.invoker.retentionMS" - val kafkaTopicsInvokerSegmentBytes = "kafka.topics.invoker.segmentBytes" - val kafkaTopicsCompletedRetentionBytes = "kafka.topics.completed.retentionBytes" - val kafkaTopicsCompletedRetentionMS = "kafka.topics.completed.retentionMS" - val kafkaTopicsCompletedSegmentBytes = "kafka.topics.completed.segmentBytes" - val kafkaReplicationFactor = "kafka.replicationFactor" - val actionSequenceMaxLimit = "limits.actions.sequence.maxLength" val actionInvokePerMinuteLimit = "limits.actions.invokes.perMinute" val actionInvokeConcurrentLimit = "limits.actions.invokes.concurrent" diff --git a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala index 7772723..1737624 100644 --- a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala +++ b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala @@ -34,6 +34,5 @@ trait MessagingProvider extends Spi { maxPeek: Int = Int.MaxValue, maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging): MessageConsumer def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer - def ensureTopic(config: WhiskConfig, topic: String, topicConfig: Map[String, String])( - implicit logging: Logging): Boolean + def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean } diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala index 80a6a90..08b8f1d 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala @@ -212,17 +212,15 @@ object Controller { } val msgProvider = SpiLoader.get[MessagingProvider] - if (!msgProvider.ensureTopic( - config, - "completed" + instance, - Map( - "numPartitions" -> "1", - "replicationFactor" -> config.kafkaReplicationFactor, - "retention.bytes" -> config.kafkaTopicsCompletedRetentionBytes, - "retention.ms" -> config.kafkaTopicsCompletedRetentionMS, - "segment.bytes" -> config.kafkaTopicsCompletedSegmentBytes))) { + if (!msgProvider.ensureTopic(config, topic = "completed" + instance, topicConfig = "completed")) { abort(s"failure during msgProvider.ensureTopic for topic completed$instance") } + if (!msgProvider.ensureTopic(config, topic = "health", topicConfig = "health")) { + abort(s"failure during msgProvider.ensureTopic for topic health") + } + if (!msgProvider.ensureTopic(config, topic = "cacheInvalidation", topicConfig = "cache-invalidation")) { + abort(s"failure during msgProvider.ensureTopic for topic cacheInvalidation") + } ExecManifest.initialize(config) match { case Success(_) => diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala index 5be950d..e3776ae 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala @@ -45,7 +45,13 @@ import whisk.core.connector.MessageProducer import whisk.core.connector.MessagingProvider import whisk.core.database.NoDocumentException import whisk.core.entity._ -import whisk.core.entity.size._ +import whisk.core.entity.{ActivationId, WhiskActivation} +import whisk.core.entity.EntityName +import whisk.core.entity.ExecutableWhiskActionMetaData +import whisk.core.entity.Identity +import whisk.core.entity.InstanceId +import whisk.core.entity.UUID +import whisk.core.entity.WhiskAction import whisk.core.entity.types.EntityStore import whisk.spi.SpiLoader import pureconfig._ @@ -345,11 +351,6 @@ class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore object LoadBalancerService { def requiredProperties = kafkaHosts ++ - Map( - kafkaTopicsCompletedRetentionBytes -> 1024.MB.toBytes.toString, - kafkaTopicsCompletedRetentionMS -> 1.hour.toMillis.toString, - kafkaTopicsCompletedSegmentBytes -> 512.MB.toBytes.toString, - kafkaReplicationFactor -> "1") ++ Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null) /** Memoizes the result of `f` for later use. */ diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala index 1a71e29..77e3da8 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala @@ -43,7 +43,6 @@ import whisk.core.entity.ExecManifest import whisk.core.entity.InstanceId import whisk.core.entity.WhiskActivationStore import whisk.core.entity.WhiskEntityStore -import whisk.core.entity.size._ import whisk.http.BasicHttpService import whisk.spi.SpiLoader import whisk.utils.ExecutionContextFactory @@ -62,11 +61,6 @@ object Invoker { WhiskEntityStore.requiredProperties ++ WhiskActivationStore.requiredProperties ++ kafkaHosts ++ - Map( - kafkaTopicsInvokerRetentionBytes -> 1024.MB.toBytes.toString, - kafkaTopicsInvokerRetentionMS -> 48.hour.toMillis.toString, - kafkaTopicsInvokerSegmentBytes -> 512.MB.toBytes.toString, - kafkaReplicationFactor -> "1") ++ zookeeperHosts ++ wskApiHost ++ Map( dockerImageTag -> "latest", @@ -188,15 +182,7 @@ object Invoker { val invokerInstance = InstanceId(assignedInvokerId) val msgProvider = SpiLoader.get[MessagingProvider] - if (!msgProvider.ensureTopic( - config, - "invoker" + assignedInvokerId, - Map( - "numPartitions" -> "1", - "replicationFactor" -> config.kafkaReplicationFactor, - "retention.bytes" -> config.kafkaTopicsInvokerRetentionBytes, - "retention.ms" -> config.kafkaTopicsInvokerRetentionMS, - "segment.bytes" -> config.kafkaTopicsInvokerSegmentBytes))) { + if (!msgProvider.ensureTopic(config, topic = "invoker" + assignedInvokerId, topicConfig = "invoker")) { abort(s"failure during msgProvider.ensureTopic for topic invoker$assignedInvokerId") } val producer = msgProvider.getProducer(config, ec) -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].