markusthoemmes closed pull request #3193: No automatic topic creation in Kafka
URL: https://github.com/apache/incubator-openwhisk/pull/3193
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/roles/kafka/tasks/deploy.yml 
b/ansible/roles/kafka/tasks/deploy.yml
index 7c6fedae95..cf599195e5 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -23,6 +23,7 @@
       "KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
       "KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}"
       "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
+      "KAFKA_AUTO_CREATE_TOPICS_ENABLE": "false"
     ports:
       - "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:9092"
 
diff --git a/tests/src/test/resources/application.conf 
b/tests/src/test/resources/application.conf
index 039f2f4eb7..53f420d0bf 100644
--- a/tests/src/test/resources/application.conf
+++ b/tests/src/test/resources/application.conf
@@ -8,3 +8,17 @@ whisk.spi {
 akka.http.client.idle-timeout = 90 s
 akka.http.host-connection-pool.idle-timeout = 90 s
 akka.http.host-connection-pool.client.idle-timeout = 90 s
+
+whisk {
+    # kafka related configuration
+    kafka {
+        replication-factor = 1
+        topics {
+            KafkaConnectorTestTopic {
+                segment-bytes   =  536870912
+                retention-bytes = 1073741824
+                retention-ms    = 3600000
+            }
+        }
+    }
+}
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala 
b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 6001c9cd58..d0c3019900 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -34,6 +34,7 @@ import common.{StreamLogging, TestUtils, WhiskProperties, 
WskActorSystem}
 import whisk.common.TransactionId
 import whisk.connector.kafka.KafkaConsumerConnector
 import whisk.connector.kafka.KafkaProducerConnector
+import whisk.connector.kafka.KafkaMessagingProvider
 import whisk.core.WhiskConfig
 import whisk.core.connector.Message
 import whisk.utils.ExecutionContextFactory
@@ -48,7 +49,17 @@ class KafkaConnectorTests extends FlatSpec with Matchers 
with WskActorSystem wit
   assert(config.isValid)
 
   val groupid = "kafkatest"
-  val topic = "Dinosaurs"
+  val topic = "KafkaConnectorTestTopic"
+
+  // Need to overwrite replication factor for tests that shut down and start
+  // Kafka instances intentionally. These tests will fail if there is more than
+  // one Kafka host but a replication factor of 1.
+  val kafkaHosts = config.kafkaHosts.split(",")
+  val replicationFactor = kafkaHosts.length / 2 + 1
+  System.setProperty("whisk.kafka.replication-factor", 
replicationFactor.toString)
+  println(s"Create test topic '${topic}' with 
replicationFactor=${replicationFactor}")
+  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation 
of topic ${topic} failed")
+
   val sessionTimeout = 10 seconds
   val maxPollInterval = 10 seconds
   val producer = new KafkaProducerConnector(config.kafkaHosts, ec)
@@ -128,9 +139,8 @@ class KafkaConnectorTests extends FlatSpec with Matchers 
with WskActorSystem wit
     }
   }
 
-  it should "send and receive a kafka message even after shutdown one of 
instances" in {
-    val kafkaHosts = config.kafkaHosts.split(",")
-    if (kafkaHosts.length > 1) {
+  if (kafkaHosts.length > 1) {
+    it should "send and receive a kafka message even after shutdown one of 
instances" in {
       for (i <- 0 until kafkaHosts.length) {
         val message = new Message { override val serialize = 
Calendar.getInstance().getTime().toString }
         val kafkaHost = kafkaHosts(i).split(":")(0)
@@ -138,8 +148,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers 
with WskActorSystem wit
         val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost, 
"logs", s"kafka$i").stdout).length
 
         commandComponent(kafkaHost, "stop", s"kafka$i")
-        var received = sendAndReceiveMessage(message, 30 seconds, 30 seconds)
-        received.size should be(1)
+        sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have 
size (1)
         consumer.commit()
 
         commandComponent(kafkaHost, "start", s"kafka$i")
@@ -149,8 +158,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers 
with WskActorSystem wit
             .length shouldBe prevCount + 1
         }, 20, Some(1.second)) // wait until kafka is up
 
-        received = sendAndReceiveMessage(message, 30 seconds, 30 seconds)
-        received.size should be(1)
+        sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have 
size (1)
         consumer.commit()
       }
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to