Repository: kafka Updated Branches: refs/heads/trunk 5b375d7bf -> 33d745e2d
http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala new file mode 100644 index 0000000..3d4b40c --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.Properties + +import kafka.utils.TestUtils +import org.apache.kafka.common.internals.TopicConstants +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse} +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.JavaConverters._ + +class MetadataRequestTest extends BaseRequestTest { + + override def propertyOverrides(properties: Properties) { + properties.setProperty(KafkaConfig.RackProp, s"rack/${properties.getProperty(KafkaConfig.BrokerIdProp)}") + } + + @Test + def testControllerId() { + val controllerServer = servers.find(_.kafkaController.isActive()).get + val controllerId = controllerServer.config.brokerId + val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1) + + assertEquals("Controller id should match the active controller", + controllerId, metadataResponse.controller.id) + + // Fail over the controller + controllerServer.shutdown() + controllerServer.startup() + + val controllerServer2 = servers.find(_.kafkaController.isActive()).get + val controllerId2 = controllerServer2.config.brokerId + assertNotEquals("Controller id should switch to a new broker", controllerId, controllerId2) + TestUtils.waitUntilTrue(() => { + val metadataResponse2 = sendMetadataRequest(MetadataRequest.allTopics(), 1) + controllerServer2.apis.brokerId == metadataResponse2.controller.id + }, "Controller id should match the active controller after failover", 5000) + } + + @Test + def testRack() { + val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1) + // Validate rack matches what's set in generateConfigs() above + metadataResponse.brokers.asScala.foreach { broker => + assertEquals("Rack information should match config", s"rack/${broker.id}", broker.rack) + } + } + + @Test + def testIsInternal() { + val internalTopic = TopicConstants.GROUP_METADATA_TOPIC_NAME + val notInternalTopic = "notInternal" + // create the topics + TestUtils.createTopic(zkUtils, internalTopic, 3, 2, servers) + TestUtils.createTopic(zkUtils, notInternalTopic, 3, 2, servers) + + val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1) + assertTrue("Response should have no errors", metadataResponse.errors.isEmpty) + + val topicMetadata = metadataResponse.topicMetadata.asScala + val internalTopicMetadata = topicMetadata.find(_.topic == internalTopic).get + val notInternalTopicMetadata = topicMetadata.find(_.topic == notInternalTopic).get + + assertTrue("internalTopic should show isInternal", internalTopicMetadata.isInternal) + assertFalse("notInternalTopic topic not should show isInternal", notInternalTopicMetadata.isInternal) + } + + @Test + def testNoTopicsRequest() { + // create some topics + TestUtils.createTopic(zkUtils, "t1", 3, 2, servers) + TestUtils.createTopic(zkUtils, "t2", 3, 2, servers) + + // v0, Doesn't support a "no topics" request + // v1, Empty list represents "no topics" + val metadataResponse = sendMetadataRequest(new MetadataRequest(List[String]().asJava), 1) + assertTrue("Response should have no errors", metadataResponse.errors.isEmpty) + assertTrue("Response should have no topics", metadataResponse.topicMetadata.isEmpty) + } + + @Test + def testAllTopicsRequest() { + // create some topics + TestUtils.createTopic(zkUtils, "t1", 3, 2, servers) + TestUtils.createTopic(zkUtils, "t2", 3, 2, servers) + + // v0, Empty list represents all topics + val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava), 0) + assertTrue("V0 Response should have no errors", metadataResponseV0.errors.isEmpty) + assertEquals("V0 Response should have 2 (all) topics", 2, metadataResponseV0.topicMetadata.size()) + + // v1, Null represents all topics + val metadataResponseV1 = sendMetadataRequest(MetadataRequest.allTopics(), 1) + assertTrue("V1 Response should have no errors", metadataResponseV1.errors.isEmpty) + assertEquals("V1 Response should have 2 (all) topics", 2, metadataResponseV1.topicMetadata.size()) + } + + @Test + def testReplicaDownResponse() { + val replicaDownTopic = "replicaDown" + val replicaCount = 3 + + // create a topic with 3 replicas + TestUtils.createTopic(zkUtils, replicaDownTopic, 1, replicaCount, servers) + + // Kill a replica node that is not the leader + val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava), 1) + val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head + val downNode = servers.find { server => + val serverId = server.apis.brokerId + val leaderId = partitionMetadata.leader.id + val replicaIds = partitionMetadata.replicas.asScala.map(_.id) + serverId != leaderId && replicaIds.contains(serverId) + }.get + downNode.shutdown() + + TestUtils.waitUntilTrue(() => { + val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava), 1) + val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head + val replica = metadata.replicas.asScala.find(_.id == downNode.apis.brokerId).get + replica.host == "" & replica.port == -1 + }, "Replica was not found down", 5000) + + // Validate version 0 still filters unavailable replicas and contains error + val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava), 0) + val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq + assertTrue("Response should have no errors", v0MetadataResponse.errors.isEmpty) + assertFalse(s"The downed broker should not be in the brokers list", v0BrokerIds.contains(downNode)) + assertTrue("Response should have one topic", v0MetadataResponse.topicMetadata.size == 1) + val v0PartitionMetadata = v0MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head + assertTrue("PartitionMetadata should have an error", v0PartitionMetadata.error == Errors.REPLICA_NOT_AVAILABLE) + assertTrue(s"Response should have ${replicaCount - 1} replicas", v0PartitionMetadata.replicas.size == replicaCount - 1) + + // Validate version 1 returns unavailable replicas with no error + val v1MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava), 1) + val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq + assertTrue("Response should have no errors", v1MetadataResponse.errors.isEmpty) + assertFalse(s"The downed broker should not be in the brokers list", v1BrokerIds.contains(downNode)) + assertEquals("Response should have one topic", 1, v1MetadataResponse.topicMetadata.size) + val v1PartitionMetadata = v1MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head + assertEquals("PartitionMetadata should have no errors", Errors.NONE, v1PartitionMetadata.error) + assertEquals(s"Response should have $replicaCount replicas", replicaCount, v1PartitionMetadata.replicas.size) + } + + private def sendMetadataRequest(request: MetadataRequest, version: Short): MetadataResponse = { + val response = send(request, ApiKeys.METADATA, version) + MetadataResponse.parse(response, version) + } +}
