Repository: kafka Updated Branches: refs/heads/trunk f90321553 -> 05d00b5ac
KAFKA-4032; Uncaught exceptions when autocreating topics handled by adding a catch all for any unhandled exception. Because the jira specifically mentions the InvalidReplicationFactor exception, a test was added for that specific case. Author: Grant Henke <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ismael Juma <[email protected]> Closes #1739 from granthenke/create-errors Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05d00b5a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05d00b5a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05d00b5a Branch: refs/heads/trunk Commit: 05d00b5aca2e1e59ad685a3f051d2ab022f75acc Parents: f903215 Author: Grant Henke <[email protected]> Authored: Tue Aug 23 00:14:44 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Tue Aug 23 00:14:44 2016 +0100 ---------------------------------------------------------------------- .../src/main/scala/kafka/server/KafkaApis.scala | 4 ++-- .../integration/BaseTopicMetadataTest.scala | 25 ++++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/05d00b5a/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6eb574f..0a5258e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -643,8 +643,8 @@ class KafkaApis(val requestChannel: RequestChannel, case e: TopicExistsException => // let it go, possibly another broker created this topic new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic), java.util.Collections.emptyList()) - case itex: InvalidTopicException => - new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, Topic.isInternal(topic), + case ex: Throwable => // Catch all to prevent unhandled errors + new MetadataResponse.TopicMetadata(Errors.forException(ex), topic, Topic.isInternal(topic), java.util.Collections.emptyList()) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/05d00b5a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala index 7c9f3ae..24ed954 100644 --- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala @@ -133,6 +133,31 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { } @Test + def testAutoCreateTopicWithInvalidReplication { + val adHocProps = createBrokerConfig(2, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), + trustStoreFile = trustStoreFile) + // Set default replication higher than the number of live brokers + adHocProps.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3") + // start adHoc brokers with replication factor too high + val adHocServer = createServer(new KafkaConfig(adHocProps)) + // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use + // `securityProtocol` instead of PLAINTEXT below + val adHocEndpoint = new BrokerEndPoint(adHocServer.config.brokerId, adHocServer.config.hostName, + adHocServer.boundPort(SecurityProtocol.PLAINTEXT)) + + // auto create topic on "bad" endpoint + val topic = "testAutoCreateTopic" + val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), Seq(adHocEndpoint), "TopicMetadataTest-testAutoCreateTopic", + 2000,0).topicsMetadata + assertEquals(Errors.INVALID_REPLICATION_FACTOR.code, topicsMetadata.head.errorCode) + assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size) + assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic) + assertEquals(0, topicsMetadata.head.partitionsMetadata.size) + + adHocServer.shutdown() + } + + @Test def testAutoCreateTopicWithCollision { // auto create topic val topic1 = "testAutoCreate_Topic"
