Repository: kafka
Updated Branches:
  refs/heads/trunk 5b375d7bf -> 33d745e2d


http://git-wip-us.apache.org/repos/asf/kafka/blob/33d745e2/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
new file mode 100644
index 0000000..3d4b40c
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -0,0 +1,168 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.Properties
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.internals.TopicConstants
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class MetadataRequestTest extends BaseRequestTest {
+
+  override def propertyOverrides(properties: Properties) {
+    properties.setProperty(KafkaConfig.RackProp, 
s"rack/${properties.getProperty(KafkaConfig.BrokerIdProp)}")
+  }
+
+  @Test
+  def testControllerId() {
+    val controllerServer = servers.find(_.kafkaController.isActive()).get
+    val controllerId = controllerServer.config.brokerId
+    val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1)
+
+    assertEquals("Controller id should match the active controller",
+      controllerId, metadataResponse.controller.id)
+
+    // Fail over the controller
+    controllerServer.shutdown()
+    controllerServer.startup()
+
+    val controllerServer2 = servers.find(_.kafkaController.isActive()).get
+    val controllerId2 = controllerServer2.config.brokerId
+    assertNotEquals("Controller id should switch to a new broker", 
controllerId, controllerId2)
+    TestUtils.waitUntilTrue(() => {
+      val metadataResponse2 = sendMetadataRequest(MetadataRequest.allTopics(), 
1)
+      controllerServer2.apis.brokerId == metadataResponse2.controller.id
+    }, "Controller id should match the active controller after failover", 5000)
+  }
+
+  @Test
+  def testRack() {
+    val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1)
+    // Validate rack matches what's set in generateConfigs() above
+    metadataResponse.brokers.asScala.foreach { broker =>
+      assertEquals("Rack information should match config", 
s"rack/${broker.id}", broker.rack)
+    }
+  }
+
+  @Test
+  def testIsInternal() {
+    val internalTopic = TopicConstants.GROUP_METADATA_TOPIC_NAME
+    val notInternalTopic = "notInternal"
+    // create the topics
+    TestUtils.createTopic(zkUtils, internalTopic, 3, 2, servers)
+    TestUtils.createTopic(zkUtils, notInternalTopic, 3, 2, servers)
+
+    val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1)
+    assertTrue("Response should have no errors", 
metadataResponse.errors.isEmpty)
+
+    val topicMetadata = metadataResponse.topicMetadata.asScala
+    val internalTopicMetadata = topicMetadata.find(_.topic == 
internalTopic).get
+    val notInternalTopicMetadata = topicMetadata.find(_.topic == 
notInternalTopic).get
+
+    assertTrue("internalTopic should show isInternal", 
internalTopicMetadata.isInternal)
+    assertFalse("notInternalTopic topic not should show isInternal", 
notInternalTopicMetadata.isInternal)
+  }
+
+  @Test
+  def testNoTopicsRequest() {
+    // create some topics
+    TestUtils.createTopic(zkUtils, "t1", 3, 2, servers)
+    TestUtils.createTopic(zkUtils, "t2", 3, 2, servers)
+
+    // v0, Doesn't support a "no topics" request
+    // v1, Empty list represents "no topics"
+    val metadataResponse = sendMetadataRequest(new 
MetadataRequest(List[String]().asJava), 1)
+    assertTrue("Response should have no errors", 
metadataResponse.errors.isEmpty)
+    assertTrue("Response should have no topics", 
metadataResponse.topicMetadata.isEmpty)
+  }
+
+  @Test
+  def testAllTopicsRequest() {
+    // create some topics
+    TestUtils.createTopic(zkUtils, "t1", 3, 2, servers)
+    TestUtils.createTopic(zkUtils, "t2", 3, 2, servers)
+
+    // v0, Empty list represents all topics
+    val metadataResponseV0 = sendMetadataRequest(new 
MetadataRequest(List[String]().asJava), 0)
+    assertTrue("V0 Response should have no errors", 
metadataResponseV0.errors.isEmpty)
+    assertEquals("V0 Response should have 2 (all) topics", 2, 
metadataResponseV0.topicMetadata.size())
+
+    // v1, Null represents all topics
+    val metadataResponseV1 = sendMetadataRequest(MetadataRequest.allTopics(), 
1)
+    assertTrue("V1 Response should have no errors", 
metadataResponseV1.errors.isEmpty)
+    assertEquals("V1 Response should have 2 (all) topics", 2, 
metadataResponseV1.topicMetadata.size())
+  }
+
+  @Test
+  def testReplicaDownResponse() {
+    val replicaDownTopic = "replicaDown"
+    val replicaCount = 3
+
+    // create a topic with 3 replicas
+    TestUtils.createTopic(zkUtils, replicaDownTopic, 1, replicaCount, servers)
+
+    // Kill a replica node that is not the leader
+    val metadataResponse = sendMetadataRequest(new 
MetadataRequest(List(replicaDownTopic).asJava), 1)
+    val partitionMetadata = 
metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
+    val downNode = servers.find { server =>
+      val serverId = server.apis.brokerId
+      val leaderId = partitionMetadata.leader.id
+      val replicaIds = partitionMetadata.replicas.asScala.map(_.id)
+      serverId != leaderId && replicaIds.contains(serverId)
+    }.get
+    downNode.shutdown()
+
+    TestUtils.waitUntilTrue(() => {
+      val response = sendMetadataRequest(new 
MetadataRequest(List(replicaDownTopic).asJava), 1)
+      val metadata = 
response.topicMetadata.asScala.head.partitionMetadata.asScala.head
+      val replica = metadata.replicas.asScala.find(_.id == 
downNode.apis.brokerId).get
+      replica.host == "" & replica.port == -1
+    }, "Replica was not found down", 5000)
+
+    // Validate version 0 still filters unavailable replicas and contains error
+    val v0MetadataResponse = sendMetadataRequest(new 
MetadataRequest(List(replicaDownTopic).asJava), 0)
+    val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq
+    assertTrue("Response should have no errors", 
v0MetadataResponse.errors.isEmpty)
+    assertFalse(s"The downed broker should not be in the brokers list", 
v0BrokerIds.contains(downNode))
+    assertTrue("Response should have one topic", 
v0MetadataResponse.topicMetadata.size == 1)
+    val v0PartitionMetadata = 
v0MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
+    assertTrue("PartitionMetadata should have an error", 
v0PartitionMetadata.error == Errors.REPLICA_NOT_AVAILABLE)
+    assertTrue(s"Response should have ${replicaCount - 1} replicas", 
v0PartitionMetadata.replicas.size == replicaCount - 1)
+
+    // Validate version 1 returns unavailable replicas with no error
+    val v1MetadataResponse = sendMetadataRequest(new 
MetadataRequest(List(replicaDownTopic).asJava), 1)
+    val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
+    assertTrue("Response should have no errors", 
v1MetadataResponse.errors.isEmpty)
+    assertFalse(s"The downed broker should not be in the brokers list", 
v1BrokerIds.contains(downNode))
+    assertEquals("Response should have one topic", 1, 
v1MetadataResponse.topicMetadata.size)
+    val v1PartitionMetadata = 
v1MetadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
+    assertEquals("PartitionMetadata should have no errors", Errors.NONE, 
v1PartitionMetadata.error)
+    assertEquals(s"Response should have $replicaCount replicas", replicaCount, 
v1PartitionMetadata.replicas.size)
+  }
+
+  private def sendMetadataRequest(request: MetadataRequest, version: Short): 
MetadataResponse = {
+    val response = send(request, ApiKeys.METADATA, version)
+    MetadataResponse.parse(response, version)
+  }
+}

Reply via email to