markusthoemmes closed pull request #3193: No automatic topic creation in Kafka
URL: https://github.com/apache/incubator-openwhisk/pull/3193
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/ansible/roles/kafka/tasks/deploy.yml
b/ansible/roles/kafka/tasks/deploy.yml
index 7c6fedae95..cf599195e5 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -23,6 +23,7 @@
"KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
"KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}"
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
+ "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "false"
ports:
- "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:9092"
diff --git a/tests/src/test/resources/application.conf
b/tests/src/test/resources/application.conf
index 039f2f4eb7..53f420d0bf 100644
--- a/tests/src/test/resources/application.conf
+++ b/tests/src/test/resources/application.conf
@@ -8,3 +8,17 @@ whisk.spi {
akka.http.client.idle-timeout = 90 s
akka.http.host-connection-pool.idle-timeout = 90 s
akka.http.host-connection-pool.client.idle-timeout = 90 s
+
+whisk {
+ # kafka related configuration
+ kafka {
+ replication-factor = 1
+ topics {
+ KafkaConnectorTestTopic {
+ segment-bytes = 536870912
+ retention-bytes = 1073741824
+ retention-ms = 3600000
+ }
+ }
+ }
+}
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala
b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 6001c9cd58..d0c3019900 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -34,6 +34,7 @@ import common.{StreamLogging, TestUtils, WhiskProperties,
WskActorSystem}
import whisk.common.TransactionId
import whisk.connector.kafka.KafkaConsumerConnector
import whisk.connector.kafka.KafkaProducerConnector
+import whisk.connector.kafka.KafkaMessagingProvider
import whisk.core.WhiskConfig
import whisk.core.connector.Message
import whisk.utils.ExecutionContextFactory
@@ -48,7 +49,17 @@ class KafkaConnectorTests extends FlatSpec with Matchers
with WskActorSystem wit
assert(config.isValid)
val groupid = "kafkatest"
- val topic = "Dinosaurs"
+ val topic = "KafkaConnectorTestTopic"
+
+ // Need to overwrite replication factor for tests that shut down and start
+ // Kafka instances intentionally. These tests will fail if there is more than
+ // one Kafka host but a replication factor of 1.
+ val kafkaHosts = config.kafkaHosts.split(",")
+ val replicationFactor = kafkaHosts.length / 2 + 1
+ System.setProperty("whisk.kafka.replication-factor",
replicationFactor.toString)
+ println(s"Create test topic '${topic}' with
replicationFactor=${replicationFactor}")
+ assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation
of topic ${topic} failed")
+
val sessionTimeout = 10 seconds
val maxPollInterval = 10 seconds
val producer = new KafkaProducerConnector(config.kafkaHosts, ec)
@@ -128,9 +139,8 @@ class KafkaConnectorTests extends FlatSpec with Matchers
with WskActorSystem wit
}
}
- it should "send and receive a kafka message even after shutdown one of
instances" in {
- val kafkaHosts = config.kafkaHosts.split(",")
- if (kafkaHosts.length > 1) {
+ if (kafkaHosts.length > 1) {
+ it should "send and receive a kafka message even after shutdown one of
instances" in {
for (i <- 0 until kafkaHosts.length) {
val message = new Message { override val serialize =
Calendar.getInstance().getTime().toString }
val kafkaHost = kafkaHosts(i).split(":")(0)
@@ -138,8 +148,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers
with WskActorSystem wit
val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost,
"logs", s"kafka$i").stdout).length
commandComponent(kafkaHost, "stop", s"kafka$i")
- var received = sendAndReceiveMessage(message, 30 seconds, 30 seconds)
- received.size should be(1)
+ sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have
size (1)
consumer.commit()
commandComponent(kafkaHost, "start", s"kafka$i")
@@ -149,8 +158,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers
with WskActorSystem wit
.length shouldBe prevCount + 1
}, 20, Some(1.second)) // wait until kafka is up
- received = sendAndReceiveMessage(message, 30 seconds, 30 seconds)
- received.size should be(1)
+ sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have
size (1)
consumer.commit()
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services