Repository: kafka Updated Branches: refs/heads/trunk cd8bd606c -> 858047a12
KAFKA-3402; Restore behaviour of MetadataCache.getTopicMetadata when unsupported security protocol is received Author: Ismael Juma <[email protected]> Reviewers: Jason Gustafson, Grant Henke Closes #1073 from ijuma/kafka-3402-restore-get-topic-metadata-behaviour Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/858047a1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/858047a1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/858047a1 Branch: refs/heads/trunk Commit: 858047a12ba3a7d426178c63226dd2c7509f20dd Parents: cd8bd60 Author: Ismael Juma <[email protected]> Authored: Wed Mar 16 16:15:55 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Wed Mar 16 16:15:55 2016 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/server/MetadataCache.scala | 7 +- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../unit/kafka/server/MetadataCacheTest.scala | 90 ++++++++++++++------ 3 files changed, 72 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/858047a1/core/src/main/scala/kafka/server/MetadataCache.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 6df261c..b23ecbe 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -24,7 +24,7 @@ import scala.collection.{Seq, Set, mutable} import scala.collection.JavaConverters._ import kafka.cluster.{Broker, EndPoint} import kafka.api._ -import kafka.common.TopicAndPartition +import kafka.common.{BrokerEndPointNotAvailableException, TopicAndPartition} import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch} import kafka.utils.CoreUtils._ import kafka.utils.Logging @@ -55,7 +55,10 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { } private def getAliveEndpoint(brokerId: Int, protocol: SecurityProtocol): Option[Node] = - aliveNodes.get(brokerId).flatMap(_.get(protocol)) + aliveNodes.get(brokerId).map { nodeMap => + nodeMap.getOrElse(protocol, + throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not support security protocol `$protocol`")) + } private def getPartitionMetadata(topic: String, protocol: SecurityProtocol): Option[Iterable[MetadataResponse.PartitionMetadata]] = { cache.get(topic).map { partitions => http://git-wip-us.apache.org/repos/asf/kafka/blob/858047a1/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 33027e7..b09c541 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -187,7 +187,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { private def createUpdateMetadataRequest = { val partitionState = Map(tp -> new requests.UpdateMetadataRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId, - Map(SecurityProtocol.PLAINTEXT -> new requests.UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava + Map(SecurityProtocol.PLAINTEXT -> new requests.UpdateMetadataRequest.EndPoint("localhost", 0)).asJava, null)).asJava new requests.UpdateMetadataRequest(brokerId, Int.MaxValue, partitionState, brokers) } http://git-wip-us.apache.org/repos/asf/kafka/blob/858047a1/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index f3f0c87..dcc310f 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -19,10 +19,11 @@ package kafka.server import java.util import util.Arrays.asList +import kafka.common.BrokerEndPointNotAvailableException import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} import org.apache.kafka.common.requests.UpdateMetadataRequest -import org.apache.kafka.common.requests.UpdateMetadataRequest.{PartitionState, Broker, EndPoint} +import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint, PartitionState} import org.junit.Test import org.junit.Assert._ @@ -49,10 +50,18 @@ class MetadataCacheTest { val zkVersion = 3 val controllerId = 2 val controllerEpoch = 1 - val brokers = Set( - new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava), - new Broker(1, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava), - new Broker(2, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava)) + + def securityProtocolToEndPoint(brokerId: Int): Map[SecurityProtocol, EndPoint] = { + val host = s"foo-$brokerId" + Map( + SecurityProtocol.PLAINTEXT -> new EndPoint(host, 9092), + SecurityProtocol.SSL -> new EndPoint(host, 9093) + ) + } + + val brokers = (0 to 2).map { brokerId => + new Broker(brokerId, securityProtocolToEndPoint(brokerId).asJava, "rack1") + }.toSet val partitionStates = Map( new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 0, 0, asList(0), zkVersion, asSet(0)), @@ -62,24 +71,32 @@ class MetadataCacheTest { val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava) cache.updateCache(15, updateMetadataRequest) - val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT) - assertEquals(1, topicMetadatas.size) + for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, SecurityProtocol.SSL)) { + val topicMetadatas = cache.getTopicMetadata(Set(topic), securityProtocol) + assertEquals(1, topicMetadatas.size) + + val topicMetadata = topicMetadatas.head + assertEquals(Errors.NONE, topicMetadata.error) + assertEquals(topic, topicMetadata.topic) + + val partitionMetadatas = topicMetadata.partitionMetadata.asScala.sortBy(_.partition) + assertEquals(3, partitionMetadatas.size) + + for (i <- 0 to 2) { + val partitionMetadata = partitionMetadatas(i) + assertEquals(Errors.NONE, partitionMetadata.error) + assertEquals(i, partitionMetadata.partition) + val leader = partitionMetadata.leader + assertEquals(i, leader.id) + val endPoint = securityProtocolToEndPoint(partitionMetadata.leader.id)(securityProtocol) + assertEquals(endPoint.host, leader.host) + assertEquals(endPoint.port, leader.port) + assertEquals(List(i), partitionMetadata.isr.asScala.map(_.id)) + assertEquals(List(i), partitionMetadata.replicas.asScala.map(_.id)) + } - val topicMetadata = topicMetadatas.head - assertEquals(Errors.NONE, topicMetadata.error) - assertEquals(topic, topicMetadata.topic) - - val partitionMetadatas = topicMetadata.partitionMetadata.asScala.sortBy(_.partition) - assertEquals(3, partitionMetadatas.size) - - for (i <- 0 to 2) { - val partitionMetadata = partitionMetadatas(i) - assertEquals(Errors.NONE, partitionMetadata.error) - assertEquals(i, partitionMetadata.partition) - assertEquals(i, partitionMetadata.leader.id) - assertEquals(List(i), partitionMetadata.isr.asScala.map(_.id)) - assertEquals(List(i), partitionMetadata.replicas.asScala.map(_.id)) } + } @Test @@ -91,7 +108,7 @@ class MetadataCacheTest { val zkVersion = 3 val controllerId = 2 val controllerEpoch = 1 - val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava)) + val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava, null)) val leader = 1 val leaderEpoch = 1 @@ -127,7 +144,7 @@ class MetadataCacheTest { val zkVersion = 3 val controllerId = 2 val controllerEpoch = 1 - val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava)) + val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava, null)) // replica 1 is not available val leader = 0 @@ -166,7 +183,7 @@ class MetadataCacheTest { val zkVersion = 3 val controllerId = 2 val controllerEpoch = 1 - val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava)) + val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava, "rack1")) // replica 1 is not available val leader = 0 @@ -196,4 +213,29 @@ class MetadataCacheTest { assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet) } + @Test + def getTopicMetadataWithNonSupportedSecurityProtocol() { + val topic = "topic" + val cache = new MetadataCache(1) + val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava, "")) + val controllerEpoch = 1 + val leader = 0 + val leaderEpoch = 0 + val replicas = asSet[Integer](0) + val isr = asList[Integer](0, 1) + val partitionStates = Map( + new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas)) + val updateMetadataRequest = new UpdateMetadataRequest(2, controllerEpoch, partitionStates.asJava, brokers.asJava) + cache.updateCache(15, updateMetadataRequest) + + try { + val result = cache.getTopicMetadata(Set(topic), SecurityProtocol.SSL) + fail(s"Exception should be thrown by `getTopicMetadata` with non-supported SecurityProtocol, $result was returned instead") + } + catch { + case e: BrokerEndPointNotAvailableException => //expected + } + + } + }
