dgrove-oss closed pull request #2988: Revert "Programmatic lazy creation of
completedN and invokerN topics ?
URL: https://github.com/apache/incubator-openwhisk/pull/2988
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/ansible/roles/controller/tasks/deploy.yml
b/ansible/roles/controller/tasks/deploy.yml
index 2e5d2765da..7b3157fd18 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -53,9 +53,6 @@
"KAFKA_HOST": "{{ groups['kafka']|first }}"
"KAFKA_HOST_PORT": "{{ kafka.port }}"
- "KAFKA_TOPICS_COMPLETED_RETENTION_BYTES": "{{
kafka.topics.completed.retentionBytes }}"
- "KAFKA_TOPICS_COMPLETED_RETENTION_MS": "{{
kafka.topics.completed.retentionMS }}"
- "KAFKA_TOPICS_COMPLETED_SEGMENT_BYTES": "{{
kafka.topics.completed.segmentBytes }}"
"DB_PROTOCOL": "{{ db_protocol }}"
"DB_PROVIDER": "{{ db_provider }}"
diff --git a/ansible/roles/invoker/tasks/deploy.yml
b/ansible/roles/invoker/tasks/deploy.yml
index 560d64a68c..b7d9fc2c2f 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -119,9 +119,6 @@
-e PORT='8080'
-e KAFKA_HOST='{{ groups['kafka']|first }}'
-e KAFKA_HOST_PORT='{{ kafka.port }}'
- -e KAFKA_TOPICS_INVOKER_RETENTION_BYTES='{{
kafka.topics.invoker.retentionBytes }}'
- -e KAFKA_TOPICS_INVOKER_RETENTION_MS='{{
kafka.topics.invoker.retentionMS }}'
- -e KAFKA_TOPICS_INVOKER_SEGMENT_BYTES='{{
kafka.topics.invoker.segmentBytes }}'
-e ZOOKEEPER_HOST='{{ groups['kafka']|first }}'
-e ZOOKEEPER_HOST_PORT='{{ zookeeper.port }}'
-e DB_PROTOCOL='{{ db_protocol }}'
diff --git a/ansible/roles/kafka/tasks/deploy.yml
b/ansible/roles/kafka/tasks/deploy.yml
index f410cc3502..dccae1b028 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -60,3 +60,17 @@
settings: "{{ kafka.topics.health }}"
- name: cacheInvalidation
settings: "{{ kafka.topics.cacheInvalidation }}"
+
+- name: create the active-ack topics
+ shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create
--topic completed{{ item.0 }} --replication-factor 1 --partitions 1 --zookeeper
{{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{
kafka.topics.completed.retentionBytes }} --config retention.ms={{
kafka.topics.completed.retentionMS }} --config segment.bytes={{
kafka.topics.completed.segmentBytes }}'"
+ with_indexed_items: "{{ groups['controllers'] }}"
+ register: command_result
+ failed_when: "not ('Created topic' in command_result.stdout or 'already
exists' in command_result.stdout)"
+ changed_when: "'Created topic' in command_result.stdout"
+
+- name: create the invoker topics
+ shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create
--topic invoker{{ item.0 }} --replication-factor 1 --partitions 1 --zookeeper
{{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{
kafka.topics.invoker.retentionBytes }} --config retention.ms={{
kafka.topics.invoker.retentionMS }} --config segment.bytes={{
kafka.topics.invoker.segmentBytes }}'"
+ with_indexed_items: "{{ groups['invokers'] }}"
+ register: command_result
+ failed_when: "not ('Created topic' in command_result.stdout or 'already
exists' in command_result.stdout)"
+ changed_when: "'Created topic' in command_result.stdout"
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 145b61a92e..aa52e5b456 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -25,7 +25,7 @@ dependencies {
compile 'commons-codec:commons-codec:1.9'
compile 'commons-io:commons-io:2.4'
compile 'commons-collections:commons-collections:3.2.2'
- compile 'org.apache.kafka:kafka-clients:0.11.0.1'
+ compile 'org.apache.kafka:kafka-clients:0.10.2.1'
compile ('org.apache.httpcomponents:httpclient:4.4.1') {
exclude group: 'commons-logging'
}
diff --git
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index fc6b593044..ff6daf1b9e 100644
---
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -17,18 +17,8 @@
package whisk.connector.kafka
-import java.util.Properties
-import java.util.concurrent.ExecutionException
-
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
-import scala.collection.JavaConverters._
-
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.clients.admin.AdminClient
-import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.common.errors.TopicExistsException
-
import whisk.common.Logging
import whisk.core.WhiskConfig
import whisk.core.connector.MessageConsumer
@@ -45,30 +35,4 @@ object KafkaMessagingProvider extends MessagingProvider {
def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging:
Logging): MessageProducer =
new KafkaProducerConnector(config.kafkaHost, ec)
-
- def ensureTopic(config: WhiskConfig, topic: String, topicConfig: Map[String,
String])(
- implicit logging: Logging): Boolean = {
- val props = new Properties
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHost)
- val client = AdminClient.create(props)
- val numPartitions = topicConfig.getOrElse("numPartitions", "1").toInt
- val replicationFactor = topicConfig.getOrElse("replicationFactor",
"1").toShort
- val nt = new NewTopic(topic, numPartitions, replicationFactor)
- .configs((topicConfig - ("numPartitions", "replicationFactor")).asJava)
- val results = client.createTopics(List(nt).asJava)
- try {
- results.values().get(topic).get()
- logging.info(this, s"created topic $topic")
- true
- } catch {
- case e: ExecutionException if
e.getCause.isInstanceOf[TopicExistsException] =>
- logging.info(this, s"topic $topic already existed")
- true
- case _: Exception =>
- logging.error(this, s"exception during creation of topic $topic")
- false
- } finally {
- client.close()
- }
- }
}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 7a724d0568..bc63dc2bea 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -97,13 +97,6 @@ class WhiskConfig(requiredProperties: Map[String, String],
val mainDockerEndpoint = this(WhiskConfig.mainDockerEndpoint)
- val kafkaTopicsInvokerRetentionBytes =
this(WhiskConfig.kafkaTopicsInvokerRetentionBytes)
- val kafkaTopicsInvokerRetentionMS =
this(WhiskConfig.kafkaTopicsInvokerRetentionMS)
- val kafkaTopicsInvokerSegmentBytes =
this(WhiskConfig.kafkaTopicsInvokerSegmentBytes)
- val kafkaTopicsCompletedRetentionBytes =
this(WhiskConfig.kafkaTopicsCompletedRetentionBytes)
- val kafkaTopicsCompletedRetentionMS =
this(WhiskConfig.kafkaTopicsCompletedRetentionMS)
- val kafkaTopicsCompletedSegmentBytes =
this(WhiskConfig.kafkaTopicsCompletedSegmentBytes)
-
val runtimesManifest = this(WhiskConfig.runtimesManifest)
val actionInvokePerMinuteLimit = this(WhiskConfig.actionInvokePerMinuteLimit)
val actionInvokeConcurrentLimit =
this(WhiskConfig.actionInvokeConcurrentLimit)
@@ -241,13 +234,6 @@ object WhiskConfig {
val runtimesManifest = "runtimes.manifest"
- val kafkaTopicsInvokerRetentionBytes = "kafka.topics.invoker.retentionBytes"
- val kafkaTopicsInvokerRetentionMS = "kafka.topics.invoker.retentionMS"
- val kafkaTopicsInvokerSegmentBytes = "kafka.topics.invoker.segmentBytes"
- val kafkaTopicsCompletedRetentionBytes =
"kafka.topics.completed.retentionBytes"
- val kafkaTopicsCompletedRetentionMS = "kafka.topics.completed.retentionMS"
- val kafkaTopicsCompletedSegmentBytes = "kafka.topics.completed.segmentBytes"
-
val actionSequenceMaxLimit = "limits.actions.sequence.maxLength"
val actionInvokePerMinuteLimit = "limits.actions.invokes.perMinute"
val actionInvokeConcurrentLimit = "limits.actions.invokes.concurrent"
diff --git
a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
index 7772723317..ec938e11cb 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
@@ -34,6 +34,4 @@ trait MessagingProvider extends Spi {
maxPeek: Int = Int.MaxValue,
maxPollInterval: FiniteDuration = 5.minutes)(implicit
logging: Logging): MessageConsumer
def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging:
Logging): MessageProducer
- def ensureTopic(config: WhiskConfig, topic: String, topicConfig: Map[String,
String])(
- implicit logging: Logging): Boolean
}
diff --git
a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index d41a0dc1a8..cc140b1a3d 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -39,7 +39,6 @@ import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionId
import whisk.core.WhiskConfig
-import whisk.core.connector.MessagingProvider
import whisk.core.database.RemoteCacheInvalidation
import whisk.core.database.CacheChangeNotification
import whisk.core.entitlement._
@@ -200,28 +199,15 @@ object Controller {
require(args.length >= 1, "controller instance required")
val instance = args(0).toInt
- def abort(message: String) = {
- logger.error(this, message)
+ def abort() = {
+ logger.error(this, "Bad configuration, cannot start.")
actorSystem.terminate()
Await.result(actorSystem.whenTerminated, 30.seconds)
sys.exit(1)
}
if (!config.isValid) {
- abort("Bad configuration, cannot start.")
- }
-
- val msgProvider = SpiLoader.get[MessagingProvider]
- if (!msgProvider.ensureTopic(
- config,
- "completed" + instance,
- Map(
- "numPartitions" -> "1",
- "replicationFactor" -> "1",
- "retention.bytes" -> config.kafkaTopicsCompletedRetentionBytes,
- "retention.ms" -> config.kafkaTopicsCompletedRetentionMS,
- "segment.bytes" -> config.kafkaTopicsCompletedSegmentBytes))) {
- abort(s"failure during msgProvider.ensureTopic for topic
completed$instance")
+ abort()
}
ExecManifest.initialize(config) match {
@@ -236,7 +222,8 @@ object Controller {
BasicHttpService.startService(controller.route, port)(actorSystem,
controller.materializer)
case Failure(t) =>
- abort(s"Invalid runtimes manifest: $t")
+ logger.error(this, s"Invalid runtimes manifest: $t")
+ abort()
}
}
}
diff --git
a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
index a8be73b2f9..4237ed61f2 100644
---
a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++
b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
@@ -24,7 +24,7 @@ import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
-import scala.concurrent.duration._
+import scala.concurrent.duration.DurationInt
import scala.util.Failure
import scala.util.Success
import org.apache.kafka.clients.producer.RecordMetadata
@@ -51,7 +51,6 @@ import whisk.core.entity.Identity
import whisk.core.entity.InstanceId
import whisk.core.entity.UUID
import whisk.core.entity.WhiskAction
-import whisk.core.entity.size._
import whisk.core.entity.types.EntityStore
import whisk.spi.SpiLoader
@@ -342,16 +341,11 @@ class LoadBalancerService(config: WhiskConfig, instance:
InstanceId, entityStore
object LoadBalancerService {
def requiredProperties =
- kafkaHost ++
- Map(
- kafkaTopicsCompletedRetentionBytes -> 1024.MB.toBytes.toString,
- kafkaTopicsCompletedRetentionMS -> 1.hour.toMillis.toString,
- kafkaTopicsCompletedSegmentBytes -> 512.MB.toBytes.toString) ++
- Map(
- loadbalancerInvokerBusyThreshold -> null,
- controllerBlackboxFraction -> null,
- controllerLocalBookkeeping -> null,
- controllerSeedNodes -> null)
+ kafkaHost ++ Map(
+ loadbalancerInvokerBusyThreshold -> null,
+ controllerBlackboxFraction -> null,
+ controllerLocalBookkeeping -> null,
+ controllerSeedNodes -> null)
/** Memoizes the result of `f` for later use. */
def memoize[I, O](f: I => O): I => O = new
scala.collection.mutable.HashMap[I, O]() {
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 1c6853a96c..192100dcf5 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -43,7 +43,6 @@ import whisk.core.entity.ExecManifest
import whisk.core.entity.InstanceId
import whisk.core.entity.WhiskActivationStore
import whisk.core.entity.WhiskEntityStore
-import whisk.core.entity.size._
import whisk.http.BasicHttpService
import whisk.spi.SpiLoader
import whisk.utils.ExecutionContextFactory
@@ -62,10 +61,6 @@ object Invoker {
WhiskEntityStore.requiredProperties ++
WhiskActivationStore.requiredProperties ++
kafkaHost ++
- Map(
- kafkaTopicsInvokerRetentionBytes -> 1024.MB.toBytes.toString,
- kafkaTopicsInvokerRetentionMS -> 48.hour.toMillis.toString,
- kafkaTopicsInvokerSegmentBytes -> 512.MB.toBytes.toString) ++
Map(zookeeperHostName -> "", zookeeperHostPort -> "") ++
wskApiHost ++ Map(
dockerImageTag -> "latest",
@@ -187,17 +182,6 @@ object Invoker {
val invokerInstance = InstanceId(assignedInvokerId)
val msgProvider = SpiLoader.get[MessagingProvider]
- if (!msgProvider.ensureTopic(
- config,
- "invoker" + assignedInvokerId,
- Map(
- "numPartitions" -> "1",
- "replicationFactor" -> "1",
- "retention.bytes" -> config.kafkaTopicsInvokerRetentionBytes,
- "retention.ms" -> config.kafkaTopicsInvokerRetentionMS,
- "segment.bytes" -> config.kafkaTopicsInvokerSegmentBytes))) {
- abort(s"failure during msgProvider.ensureTopic for topic
invoker$assignedInvokerId")
- }
val producer = msgProvider.getProducer(config, ec)
val invoker = try {
new InvokerReactive(config, invokerInstance, producer)
diff --git a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
index 652dcb1542..9ab2973b78 100644
--- a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
+++ b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
@@ -26,6 +26,7 @@ import scala.collection.JavaConversions._
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.Record
import common.StreamLogging
import whisk.common.Counter
@@ -77,7 +78,8 @@ class TestConnector(topic: String, override val maxPeek: Int,
allowMoreThanMax:
queue.synchronized {
if (queue.offer(msg)) {
logging.info(this, s"put: $msg")
- Future.successful(new RecordMetadata(new TopicPartition(topic, 0),
0, queue.size, -1, Long.box(-1L), -1, -1))
+ Future.successful(
+ new RecordMetadata(new TopicPartition(topic, 0), 0, queue.size,
Record.NO_TIMESTAMP, -1, -1, -1))
} else {
logging.error(this, s"put failed: $msg")
Future.failed(new IllegalStateException("failed to write msg"))
@@ -89,7 +91,8 @@ class TestConnector(topic: String, override val maxPeek: Int,
allowMoreThanMax:
queue.synchronized {
if (queue.addAll(msgs)) {
logging.info(this, s"put: ${msgs.length} messages")
- Future.successful(new RecordMetadata(new TopicPartition(topic, 0),
0, queue.size, -1, Long.box(-1L), -1, -1))
+ Future.successful(
+ new RecordMetadata(new TopicPartition(topic, 0), 0, queue.size,
Record.NO_TIMESTAMP, -1, -1, -1))
} else {
logging.error(this, s"put failed: ${msgs.length} messages")
Future.failed(new IllegalStateException("failed to write msg"))
diff --git
a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index a4af38ebca..0e06b9ff2a 100644
---
a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++
b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -206,8 +206,7 @@ class InvokerSupervisionTests
sendActivationToInvoker
.when(activationMessage, invokerInstance)
- .returns(
- Future.successful(new RecordMetadata(new TopicPartition(invokerName,
0), 0L, 0L, 0L, Long.box(0L), 0, 0)))
+ .returns(Future.successful(new RecordMetadata(new
TopicPartition(invokerName, 0), 0L, 0L, 0L, 0L, 0, 0)))
supervisor ! msg
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services