This is an automated email from the ASF dual-hosted git repository.

mcdan pushed a commit to branch mcdan/topic-check-before-create
in repository https://gitbox.apache.org/repos/asf/openwhisk.git

commit 18751156b20887447d0341d7c35b365e90700a7c
Author: dan mcweeney <[email protected]>
AuthorDate: Thu Jan 23 13:16:11 2020 -0500

    Check to see if the user can see the topic before creating it,
    this allows lower privilege users to be set for the controller
    and invoker.
---
 .../connector/kafka/KafkaMessagingProvider.scala   | 32 +++++++++++++---------
 1 file changed, 19 insertions(+), 13 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
index b54d684..addf680 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
@@ -68,19 +68,25 @@ object KafkaMessagingProvider extends MessagingProvider {
         val nt = new NewTopic(topic, partitions, 
kafkaConfig.replicationFactor).configs(topicConfig.asJava)
 
         def createTopic(retries: Int = 5): Try[Unit] = {
-          Try(client.createTopics(List(nt).asJava).values().get(topic).get())
-            .map(_ => logging.info(this, s"created topic $topic"))
-            .recoverWith {
-              case CausedBy(_: TopicExistsException) =>
-                Success(logging.info(this, s"topic $topic already existed"))
-              case CausedBy(t: RetriableException) if retries > 0 =>
-                logging.warn(this, s"topic $topic could not be created because 
of $t, retries left: $retries")
-                Thread.sleep(1.second.toMillis)
-                createTopic(retries - 1)
-              case t =>
-                logging.error(this, s"ensureTopic for $topic failed due to $t")
-                Failure(t)
-            }
+          Try(client.listTopics().names().get())
+            .map(topics =>
+              if (topics.contains(topic)) {
+                Success(logging.info(this, s"$topic already exists and the 
user can see it, skipping creation."))
+              } else {
+                
Try(client.createTopics(List(nt).asJava).values().get(topic).get())
+                  .map(_ => logging.info(this, s"created topic $topic"))
+                  .recoverWith {
+                    case CausedBy(_: TopicExistsException) =>
+                      Success(logging.info(this, s"topic $topic already 
existed"))
+                    case CausedBy(t: RetriableException) if retries > 0 =>
+                      logging.warn(this, s"topic $topic could not be created 
because of $t, retries left: $retries")
+                      Thread.sleep(1.second.toMillis)
+                      createTopic(retries - 1)
+                    case t =>
+                      logging.error(this, s"ensureTopic for $topic failed due 
to $t")
+                      Failure(t)
+                  }
+            })
         }
 
         val result = createTopic()

Reply via email to