cbickel closed pull request #3686: Refactor `ensureTopic` to expose failure details. URL: https://github.com/apache/incubator-openwhisk/pull/3686
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/scala/src/main/scala/whisk/common/CausedBy.scala b/common/scala/src/main/scala/whisk/common/CausedBy.scala new file mode 100644 index 0000000000..caa2ba4d0d --- /dev/null +++ b/common/scala/src/main/scala/whisk/common/CausedBy.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.common + +/** + * Helper to match on exceptions caused by other exceptions. + * + * Use this like: + * + * ``` + * try { + * block() + * } catch { + * case CausedBy(internalException: MyFancyException) => ... + * } + * ``` + */ +object CausedBy { + def unapply(e: Throwable): Option[Throwable] = Option(e.getCause) +} diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala index e939a4637e..7351b64f32 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala @@ -18,18 +18,18 @@ package whisk.connector.kafka import java.util.Properties -import java.util.concurrent.ExecutionException import akka.actor.ActorSystem import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic} import org.apache.kafka.common.errors.TopicExistsException import pureconfig._ -import whisk.common.Logging +import whisk.common.{CausedBy, Logging} import whisk.core.{ConfigKeys, WhiskConfig} import whisk.core.connector.{MessageConsumer, MessageProducer, MessagingProvider} import scala.collection.JavaConverters._ import scala.concurrent.duration.FiniteDuration +import scala.util.{Failure, Success, Try} case class KafkaConfig(replicationFactor: Short) @@ -47,31 +47,28 @@ object KafkaMessagingProvider extends MessagingProvider { def getProducer(config: WhiskConfig)(implicit logging: Logging, actorSystem: ActorSystem): MessageProducer = new KafkaProducerConnector(config.kafkaHosts) - def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean = { - val kc = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka) - val tc = KafkaConfiguration.configMapToKafkaConfig( - loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + s".$topicConfig")) + def ensureTopic(config: WhiskConfig, topic: String, topicConfigKey: String)(implicit logging: Logging): Try[Unit] = { + val kafkaConfig = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka) + val topicConfig = KafkaConfiguration.configMapToKafkaConfig( + loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + "." + topicConfigKey)) - val baseConfig = Map(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts) val commonConfig = configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) - val client = AdminClient.create(baseConfig ++ commonConfig) - val numPartitions = 1 - val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.asJava) - val results = client.createTopics(List(nt).asJava) - try { - results.values().get(topic).get() - logging.info(this, s"created topic $topic") - true - } catch { - case e: ExecutionException if e.getCause.isInstanceOf[TopicExistsException] => - logging.info(this, s"topic $topic already existed") - true - case e: Exception => - logging.error(this, s"ensureTopic for $topic failed due to $e") - false - } finally { - client.close() - } + val client = AdminClient.create(commonConfig + (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)) + val partitions = 1 + val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava) + + val result = Try(client.createTopics(List(nt).asJava).values().get(topic).get()) + .map(_ => logging.info(this, s"created topic $topic")) + .recoverWith { + case CausedBy(_: TopicExistsException) => + Success(logging.info(this, s"topic $topic already existed")) + case t => + logging.error(this, s"ensureTopic for $topic failed due to $t") + Failure(t) + } + + client.close() + result } } diff --git a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala index 8ec1f5a55d..6e6b24a212 100644 --- a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala +++ b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala @@ -25,6 +25,8 @@ import whisk.common.Logging import whisk.core.WhiskConfig import whisk.spi.Spi +import scala.util.Try + /** * An Spi for providing Messaging implementations. */ @@ -36,5 +38,5 @@ trait MessagingProvider extends Spi { maxPeek: Int = Int.MaxValue, maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging, actorSystem: ActorSystem): MessageConsumer def getProducer(config: WhiskConfig)(implicit logging: Logging, actorSystem: ActorSystem): MessageProducer - def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean + def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Try[Unit] } diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala index c1c739075b..8096711b1b 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala @@ -216,18 +216,16 @@ object Controller { } val msgProvider = SpiLoader.get[MessagingProvider] - if (!msgProvider.ensureTopic(config, topic = "completed" + instance, topicConfig = "completed")) { - abort(s"failure during msgProvider.ensureTopic for topic completed$instance") - } - if (!msgProvider.ensureTopic(config, topic = "health", topicConfig = "health")) { - abort(s"failure during msgProvider.ensureTopic for topic health") - } - if (!msgProvider.ensureTopic(config, topic = "cacheInvalidation", topicConfig = "cache-invalidation")) { - abort(s"failure during msgProvider.ensureTopic for topic cacheInvalidation") - } - if (!msgProvider.ensureTopic(config, topic = "events", topicConfig = "events")) { - abort(s"failure during msgProvider.ensureTopic for topic events") + Map( + "completed" + instance -> "completed", + "health" -> "health", + "cacheInvalidation" -> "cache-invalidation", + "events" -> "events").foreach { + case (topic, topicConfigurationKey) => + if (msgProvider.ensureTopic(config, topic, topicConfigurationKey).isFailure) { + abort(s"failure during msgProvider.ensureTopic for topic $topic") + } } ExecManifest.initialize(config) match { diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala index 4a69d3cf99..134aa05ed5 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala @@ -168,7 +168,7 @@ object Invoker { val invokerInstance = InstanceId(assignedInvokerId, invokerName) val msgProvider = SpiLoader.get[MessagingProvider] - if (!msgProvider.ensureTopic(config, topic = "invoker" + assignedInvokerId, topicConfig = "invoker")) { + if (msgProvider.ensureTopic(config, topic = "invoker" + assignedInvokerId, topicConfig = "invoker").isFailure) { abort(s"failure during msgProvider.ensureTopic for topic invoker$assignedInvokerId") } val producer = msgProvider.getProducer(config) diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala index 3f91b59fd2..57e2e56ad2 100644 --- a/tests/src/test/scala/services/KafkaConnectorTests.scala +++ b/tests/src/test/scala/services/KafkaConnectorTests.scala @@ -18,6 +18,7 @@ package services import java.io.File +import java.nio.charset.StandardCharsets import java.util.Calendar import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem} @@ -61,11 +62,9 @@ class KafkaConnectorTests val kafkaHosts: Array[String] = config.kafkaHosts.split(",") val replicationFactor: Int = kafkaHosts.length / 2 + 1 System.setProperty("whisk.kafka.replication-factor", replicationFactor.toString) - println(s"Create test topic '$topic' with replicationFactor=$replicationFactor") - assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed") println(s"Create test topic '$topic' with replicationFactor=$replicationFactor") - assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed") + KafkaMessagingProvider.ensureTopic(config, topic, topic) shouldBe 'success val producer = new KafkaProducerConnector(config.kafkaHosts) val consumer = new KafkaConsumerConnector(config.kafkaHosts, groupid, topic) @@ -94,7 +93,8 @@ class KafkaConnectorTests val sent = Await.result(producer.send(topic, message), waitForSend) println(s"Successfully sent message to topic: $sent") println(s"Receiving message from topic.") - val received = consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg, "utf-8") } + val received = + consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg, StandardCharsets.UTF_8) } val end = java.lang.System.currentTimeMillis val elapsed = end - start println(s"Received ${received.size}. Took $elapsed msec: $received") ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services