This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new b03c0b8 Handle kafka exception thrown when creating the admin client.
(#4187)
b03c0b8 is described below
commit b03c0b8790a48ca21ba7bee781aa8bb595bc8025
Author: Martin Henke <[email protected]>
AuthorDate: Wed Jan 9 15:10:44 2019 +0100
Handle kafka exception thrown when creating the admin client. (#4187)
---
.../connector/kafka/KafkaMessagingProvider.scala | 51 ++++++++++++----------
1 file changed, 29 insertions(+), 22 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
index 5cd719d..86d258e 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
@@ -60,30 +60,37 @@ object KafkaMessagingProvider extends MessagingProvider {
} getOrElse Map.empty)
val commonConfig = configMapToKafkaConfig(loadConfigOrThrow[Map[String,
String]](ConfigKeys.kafkaCommon))
- val client = AdminClient.create(commonConfig +
(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts))
- val partitions = 1
- val nt = new NewTopic(topic, partitions,
kafkaConfig.replicationFactor).configs(topicConfig.asJava)
-
- def createTopic(retries: Int = 5): Try[Unit] = {
- Try(client.createTopics(List(nt).asJava).values().get(topic).get())
- .map(_ => logging.info(this, s"created topic $topic"))
- .recoverWith {
- case CausedBy(_: TopicExistsException) =>
- Success(logging.info(this, s"topic $topic already existed"))
- case CausedBy(t: RetriableException) if retries > 0 =>
- logging.warn(this, s"topic $topic could not be created because of
$t, retries left: $retries")
- Thread.sleep(1.second.toMillis)
- createTopic(retries - 1)
- case t =>
- logging.error(this, s"ensureTopic for $topic failed due to $t")
- Failure(t)
- }
- }
- val result = createTopic()
+ Try(AdminClient.create(commonConfig +
(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)))
+ .flatMap(client => {
+ val partitions = 1
+ val nt = new NewTopic(topic, partitions,
kafkaConfig.replicationFactor).configs(topicConfig.asJava)
+
+ def createTopic(retries: Int = 5): Try[Unit] = {
+ Try(client.createTopics(List(nt).asJava).values().get(topic).get())
+ .map(_ => logging.info(this, s"created topic $topic"))
+ .recoverWith {
+ case CausedBy(_: TopicExistsException) =>
+ Success(logging.info(this, s"topic $topic already existed"))
+ case CausedBy(t: RetriableException) if retries > 0 =>
+ logging.warn(this, s"topic $topic could not be created because
of $t, retries left: $retries")
+ Thread.sleep(1.second.toMillis)
+ createTopic(retries - 1)
+ case t =>
+ logging.error(this, s"ensureTopic for $topic failed due to $t")
+ Failure(t)
+ }
+ }
- client.close()
- result
+ val result = createTopic()
+ client.close()
+ result
+ })
+ .recoverWith {
+ case e =>
+ logging.error(this, s"ensureTopic for $topic failed due to $e")
+ Failure(e)
+ }
}
}