http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 6217302..cf1a5a6 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -19,12 +19,14 @@ package kafka.server import java.util.Properties +import kafka.api.ApiVersion +import kafka.cluster.EndPoint import kafka.consumer.ConsumerConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} import kafka.utils.Utils import org.apache.kafka.common.config.ConfigDef - -import scala.collection.{JavaConversions, Map} +import org.apache.kafka.common.protocol.SecurityProtocol +import scala.collection.{immutable, JavaConversions, Map} object Defaults { /** ********* Zookeeper Configuration ***********/ @@ -101,6 +103,8 @@ object Defaults { val LeaderImbalancePerBrokerPercentage = 10 val LeaderImbalanceCheckIntervalSeconds = 300 val UncleanLeaderElectionEnable = true + val InterBrokerSecurityProtocol = SecurityProtocol.PLAINTEXT.toString + val InterBrokerProtocolVersion = ApiVersion.latestVersion.toString /** ********* Controlled shutdown configuration ***********/ val ControlledShutdownMaxRetries = 3 @@ -142,8 +146,10 @@ object KafkaConfig { /** ********* Socket Server Configuration ***********/ val PortProp = "port" val HostNameProp = "host.name" + val ListenersProp = "listeners" val AdvertisedHostNameProp: String = "advertised.host.name" val AdvertisedPortProp = "advertised.port" + val AdvertisedListenersProp = "advertised.listeners" val SocketSendBufferBytesProp = "socket.send.buffer.bytes" val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes" val SocketRequestMaxBytesProp = "socket.request.max.bytes" @@ -207,6 +213,8 @@ object KafkaConfig { val LeaderImbalancePerBrokerPercentageProp = "leader.imbalance.per.broker.percentage" val LeaderImbalanceCheckIntervalSecondsProp = "leader.imbalance.check.interval.seconds" val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" + val InterBrokerSecurityProtocolProp = "security.inter.broker.protocol" + val InterBrokerProtocolVersionProp = "inter.broker.protocol.version" /** ********* Controlled shutdown configuration ***********/ val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries" val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms" @@ -246,6 +254,12 @@ object KafkaConfig { /** ********* Socket Server Configuration ***********/ val PortDoc = "the port to listen and accept connections on" val HostNameDoc = "hostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all interfaces" + val ListenersDoc = "Listener List - Comma-separated list of URIs we will listen on and their protocols.\n" + + " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" + + " Leave hostname empty to bind to default interface.\n" + + " Examples of legal listener lists:\n" + + " PLAINTEXT://myhost:9092,TRACE://:9091\n" + + " PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093\n" val AdvertisedHostNameDoc = "Hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may " + "need to be different from the interface to which the broker binds. If this is not set, " + "it will use the value for \"host.name\" if configured. Otherwise " + @@ -253,6 +267,9 @@ object KafkaConfig { val AdvertisedPortDoc = "The port to publish to ZooKeeper for clients to use. In IaaS environments, this may " + "need to be different from the port to which the broker binds. If this is not set, " + "it will publish the same port that the broker binds to." + val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the listeners above." + + " In IaaS environments, this may need to be different from the interface to which the broker binds." + + " If this is not set, the value for \"listeners\" will be used." val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets" val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets" val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket request" @@ -319,6 +336,10 @@ object KafkaConfig { val LeaderImbalancePerBrokerPercentageDoc = "The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage." val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the partition rebalance check is triggered by the controller" val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss" + val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate between brokers. Defaults to plain text." + val InterBrokerProtocolVersionDoc = "Specify which version of the inter-broker protocol will be used.\n" + + " This is typically bumped after all brokers were upgraded to a new version.\n" + + " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.8.3, 0.8.3.0. Check ApiVersion for the full list." /** ********* Controlled shutdown configuration ***********/ val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens" val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying." @@ -350,7 +371,6 @@ object KafkaConfig { import ConfigDef.ValidString._ import ConfigDef.Type._ import ConfigDef.Importance._ - import java.util.Arrays.asList new ConfigDef() @@ -372,8 +392,10 @@ object KafkaConfig { /** ********* Socket Server Configuration ***********/ .define(PortProp, INT, Defaults.Port, HIGH, PortDoc) .define(HostNameProp, STRING, Defaults.HostName, HIGH, HostNameDoc) + .define(ListenersProp, STRING, HIGH, ListenersDoc, false) .define(AdvertisedHostNameProp, STRING, HIGH, AdvertisedHostNameDoc, false) .define(AdvertisedPortProp, INT, HIGH, AdvertisedPortDoc, false) + .define(AdvertisedListenersProp, STRING, HIGH, AdvertisedListenersDoc, false) .define(SocketSendBufferBytesProp, INT, Defaults.SocketSendBufferBytes, HIGH, SocketSendBufferBytesDoc) .define(SocketReceiveBufferBytesProp, INT, Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc) .define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc) @@ -439,7 +461,8 @@ object KafkaConfig { .define(LeaderImbalancePerBrokerPercentageProp, INT, Defaults.LeaderImbalancePerBrokerPercentage, HIGH, LeaderImbalancePerBrokerPercentageDoc) .define(LeaderImbalanceCheckIntervalSecondsProp, INT, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc) .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc) - + .define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc) + .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc) /** ********* Controlled shutdown configuration ***********/ .define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc) .define(ControlledShutdownRetryBackoffMsProp, INT, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc) @@ -490,8 +513,10 @@ object KafkaConfig { /** ********* Socket Server Configuration ***********/ port = parsed.get(PortProp).asInstanceOf[Int], hostName = parsed.get(HostNameProp).asInstanceOf[String], + _listeners = Option(parsed.get(ListenersProp)).map(_.asInstanceOf[String]), _advertisedHostName = Option(parsed.get(AdvertisedHostNameProp)).map(_.asInstanceOf[String]), _advertisedPort = Option(parsed.get(AdvertisedPortProp)).map(_.asInstanceOf[Int]), + _advertisedListeners = Option(parsed.get(AdvertisedListenersProp)).map(_.asInstanceOf[String]), socketSendBufferBytes = parsed.get(SocketSendBufferBytesProp).asInstanceOf[Int], socketReceiveBufferBytes = parsed.get(SocketReceiveBufferBytesProp).asInstanceOf[Int], socketRequestMaxBytes = parsed.get(SocketRequestMaxBytesProp).asInstanceOf[Int], @@ -557,7 +582,8 @@ object KafkaConfig { leaderImbalancePerBrokerPercentage = parsed.get(LeaderImbalancePerBrokerPercentageProp).asInstanceOf[Int], leaderImbalanceCheckIntervalSeconds = parsed.get(LeaderImbalanceCheckIntervalSecondsProp).asInstanceOf[Int], uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean], - + interBrokerSecurityProtocol = SecurityProtocol.valueOf(parsed.get(InterBrokerSecurityProtocolProp).asInstanceOf[String]), + interBrokerProtocolVersion = ApiVersion(parsed.get(InterBrokerProtocolVersionProp).asInstanceOf[String]), /** ********* Controlled shutdown configuration ***********/ controlledShutdownMaxRetries = parsed.get(ControlledShutdownMaxRetriesProp).asInstanceOf[Int], controlledShutdownRetryBackoffMs = parsed.get(ControlledShutdownRetryBackoffMsProp).asInstanceOf[Int], @@ -628,8 +654,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ /** ********* Socket Server Configuration ***********/ val port: Int = Defaults.Port, val hostName: String = Defaults.HostName, + private val _listeners: Option[String] = None, private val _advertisedHostName: Option[String] = None, private val _advertisedPort: Option[Int] = None, + private val _advertisedListeners: Option[String] = None, val socketSendBufferBytes: Int = Defaults.SocketSendBufferBytes, val socketReceiveBufferBytes: Int = Defaults.SocketReceiveBufferBytes, val socketRequestMaxBytes: Int = Defaults.SocketRequestMaxBytes, @@ -697,6 +725,8 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val leaderImbalancePerBrokerPercentage: Int = Defaults.LeaderImbalancePerBrokerPercentage, val leaderImbalanceCheckIntervalSeconds: Int = Defaults.LeaderImbalanceCheckIntervalSeconds, val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, + val interBrokerSecurityProtocol: SecurityProtocol = SecurityProtocol.valueOf(Defaults.InterBrokerSecurityProtocol), + val interBrokerProtocolVersion: ApiVersion = ApiVersion(Defaults.InterBrokerProtocolVersion), /** ********* Controlled shutdown configuration ***********/ val controlledShutdownMaxRetries: Int = Defaults.ControlledShutdownMaxRetries, @@ -721,8 +751,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val zkConnectionTimeoutMs: Int = _zkConnectionTimeoutMs.getOrElse(zkSessionTimeoutMs) + val listeners = getListeners() val advertisedHostName: String = _advertisedHostName.getOrElse(hostName) val advertisedPort: Int = _advertisedPort.getOrElse(port) + val advertisedListeners = getAdvertisedListeners() val logDirs = Utils.parseCsvList(_logDirs.getOrElse(_logDir)) val logRollTimeMillis = _logRollTimeMillis.getOrElse(60 * 60 * 1000L * logRollTimeHours) @@ -731,14 +763,6 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val logFlushIntervalMs = _logFlushIntervalMs.getOrElse(logFlushSchedulerIntervalMs) - private def getMap(propName: String, propValue: String): Map[String, String] = { - try { - Utils.parseCsvMap(propValue) - } catch { - case e: Exception => throw new IllegalArgumentException("Error parsing configuration property '%s': %s".format(propName, e.getMessage)) - } - } - val maxConnectionsPerIpOverrides: Map[String, Int] = getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, _maxConnectionsPerIpOverrides).map { case (k, v) => (k, v.toInt)} @@ -754,6 +778,56 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ ) } + private def getMap(propName: String, propValue: String): Map[String, String] = { + try { + Utils.parseCsvMap(propValue) + } catch { + case e: Exception => throw new IllegalArgumentException("Error parsing configuration property '%s': %s".format(propName, e.getMessage)) + } + } + + private def validateUniquePortAndProtocol(listeners: String) { + + val endpoints = try { + val listenerList = Utils.parseCsvList(listeners) + listenerList.map(listener => EndPoint.createEndPoint(listener)) + } catch { + case e: Exception => throw new IllegalArgumentException("Error creating broker listeners from '%s': %s".format(listeners, e.getMessage)) + } + val distinctPorts = endpoints.map(ep => ep.port).distinct + val distinctProtocols = endpoints.map(ep => ep.protocolType).distinct + + require(distinctPorts.size == endpoints.size, "Each listener must have a different port") + require(distinctProtocols.size == endpoints.size, "Each listener must have a different protocol") + } + + // If the user did not define listeners but did define host or port, let's use them in backward compatible way + // If none of those are defined, we default to PLAINTEXT://:9092 + private def getListeners(): immutable.Map[SecurityProtocol, EndPoint] = { + if (_listeners.isDefined) { + validateUniquePortAndProtocol(_listeners.get) + Utils.listenerListToEndPoints(_listeners.get) + } else { + Utils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port) + } + } + + // If the user defined advertised listeners, we use those + // If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults + // If none of these are defined, we'll use the listeners + private def getAdvertisedListeners(): immutable.Map[SecurityProtocol, EndPoint] = { + if (_advertisedListeners.isDefined) { + validateUniquePortAndProtocol(_advertisedListeners.get) + Utils.listenerListToEndPoints(_advertisedListeners.get) + } else if (_advertisedHostName.isDefined || _advertisedPort.isDefined ) { + Utils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort) + } else { + getListeners() + } + } + + + validateValues() private def validateValues() { @@ -797,8 +871,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ /** ********* Socket Server Configuration ***********/ props.put(PortProp, port.toString) props.put(HostNameProp, hostName) + _listeners.foreach(props.put(ListenersProp, _)) _advertisedHostName.foreach(props.put(AdvertisedHostNameProp, _)) _advertisedPort.foreach(value => props.put(AdvertisedPortProp, value.toString)) + _advertisedListeners.foreach(props.put(AdvertisedListenersProp, _)) props.put(SocketSendBufferBytesProp, socketSendBufferBytes.toString) props.put(SocketReceiveBufferBytesProp, socketReceiveBufferBytes.toString) props.put(SocketRequestMaxBytesProp, socketRequestMaxBytes.toString) @@ -865,6 +941,9 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props.put(LeaderImbalancePerBrokerPercentageProp, leaderImbalancePerBrokerPercentage.toString) props.put(LeaderImbalanceCheckIntervalSecondsProp, leaderImbalanceCheckIntervalSeconds.toString) props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) + props.put(InterBrokerSecurityProtocolProp, interBrokerSecurityProtocol.toString) + props.put(InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString) + /** ********* Controlled shutdown configuration ***********/ props.put(ControlledShutdownMaxRetriesProp, controlledShutdownMaxRetries.toString)
http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/KafkaHealthcheck.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 7907987..861b7f6 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -17,7 +17,9 @@ package kafka.server +import kafka.cluster.EndPoint import kafka.utils._ +import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkStateListener, ZkClient} import java.net.InetAddress @@ -31,9 +33,8 @@ import java.net.InetAddress * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise * we are dead. */ -class KafkaHealthcheck(private val brokerId: Int, - private val advertisedHost: String, - private val advertisedPort: Int, +class KafkaHealthcheck(private val brokerId: Int, + private val advertisedEndpoints: Map[SecurityProtocol, EndPoint], private val zkSessionTimeoutMs: Int, private val zkClient: ZkClient) extends Logging { @@ -49,13 +50,19 @@ class KafkaHealthcheck(private val brokerId: Int, * Register this broker as "alive" in zookeeper */ def register() { - val advertisedHostName = - if(advertisedHost == null || advertisedHost.trim.isEmpty) - InetAddress.getLocalHost.getCanonicalHostName - else - advertisedHost val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt - ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, zkSessionTimeoutMs, jmxPort) + val updatedEndpoints = advertisedEndpoints.mapValues(endpoint => + if (endpoint.host == null || endpoint.host.trim.isEmpty) + EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port, endpoint.protocolType) + else + endpoint + ) + + // the default host and port are here for compatibility with older client + // only PLAINTEXT is supported as default + // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect + val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null)) + ZkUtils.registerBrokerInZk(zkClient, brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, zkSessionTimeoutMs, jmxPort) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 10ea77a..9df2cf4 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -25,10 +25,11 @@ import kafka.utils._ import java.util.concurrent._ import atomic.{AtomicInteger, AtomicBoolean} import java.io.File + import collection.mutable import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} -import kafka.cluster.Broker +import kafka.cluster.{EndPoint, Broker} import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest} import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException} import kafka.network.{Receive, BlockingChannel, SocketServer} @@ -117,57 +118,61 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg this.logIdent = "[Kafka Server " + config.brokerId + "], " socketServer = new SocketServer(config.brokerId, - config.hostName, - config.port, - config.numNetworkThreads, - config.queuedMaxRequests, - config.socketSendBufferBytes, - config.socketReceiveBufferBytes, - config.socketRequestMaxBytes, - config.maxConnectionsPerIp, - config.connectionsMaxIdleMs, - config.maxConnectionsPerIpOverrides) - socketServer.startup() - - /* start replica manager */ - replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) - replicaManager.startup() - - /* start offset manager */ - offsetManager = createOffsetManager() - - /* start kafka controller */ - kafkaController = new KafkaController(config, zkClient, brokerState) - kafkaController.startup() - - /* start kafka coordinator */ - consumerCoordinator = new ConsumerCoordinator(config, zkClient) - consumerCoordinator.startup() - - /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, - kafkaController, zkClient, config.brokerId, config, metadataCache) - requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) - brokerState.newState(RunningAsBroker) - - Mx4jLoader.maybeLoad() - - /* start topic config manager */ - topicConfigManager = new TopicConfigManager(zkClient, logManager) - topicConfigManager.startup() - - /* tell everyone we are alive */ - val advertisedPort = if (config.advertisedPort != 0) config.advertisedPort else socketServer.boundPort() - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, advertisedPort, config.zkSessionTimeoutMs, zkClient) - kafkaHealthcheck.startup() - - /* register broker metrics */ - registerStats() - - shutdownLatch = new CountDownLatch(1) - startupComplete.set(true) - isStartingUp.set(false) - info("started") + config.listeners, + config.numNetworkThreads, + config.queuedMaxRequests, + config.socketSendBufferBytes, + config.socketReceiveBufferBytes, + config.socketRequestMaxBytes, + config.maxConnectionsPerIp, + config.connectionsMaxIdleMs, + config.maxConnectionsPerIpOverrides) + socketServer.startup() + + /* start replica manager */ + replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) + replicaManager.startup() + + /* start offset manager */ + offsetManager = createOffsetManager() + + /* start kafka controller */ + kafkaController = new KafkaController(config, zkClient, brokerState) + kafkaController.startup() + + /* start kafka coordinator */ + consumerCoordinator = new ConsumerCoordinator(config, zkClient) + consumerCoordinator.startup() + + /* start processing requests */ + apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, + kafkaController, zkClient, config.brokerId, config, metadataCache) + requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) + brokerState.newState(RunningAsBroker) + + Mx4jLoader.maybeLoad() + + /* start topic config manager */ + topicConfigManager = new TopicConfigManager(zkClient, logManager) + topicConfigManager.startup() + + /* tell everyone we are alive */ + val listeners = config.advertisedListeners.map {case(protocol, endpoint) => + if (endpoint.port == 0) + (protocol, EndPoint(endpoint.host, socketServer.boundPort(), endpoint.protocolType)) + else + (protocol, endpoint) + } + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, config.zkSessionTimeoutMs, zkClient) + kafkaHealthcheck.startup() + + /* register broker metrics */ + registerStats() + + shutdownLatch = new CountDownLatch(1) + startupComplete.set(true) + isStartingUp.set(false) + info("started") } } catch { @@ -243,7 +248,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if (channel != null) { channel.disconnect() } - channel = new BlockingChannel(broker.host, broker.port, + channel = new BlockingChannel(broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).host, + broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, config.controllerSocketTimeoutMs) @@ -421,7 +427,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg * <li> config has broker.id and meta.properties contains broker.id if they don't match throws InconsistentBrokerIdException * <li> config has broker.id and there is no meta.properties file, creates new meta.properties and stores broker.id * <ol> - * @returns A brokerId. + * @return A brokerId. */ private def getBrokerId: Int = { var brokerId = config.brokerId http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/MetadataCache.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 6aef6e4..008f02b 100644 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -17,10 +17,13 @@ package kafka.server +import kafka.cluster.{BrokerEndpoint,Broker} +import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException} +import kafka.common.TopicAndPartition + import kafka.api._ -import kafka.common._ -import kafka.cluster.Broker import kafka.controller.KafkaController.StateChangeLogger +import org.apache.kafka.common.protocol.SecurityProtocol import scala.collection.{Seq, Set, mutable} import kafka.utils.Logging import kafka.utils.Utils._ @@ -39,7 +42,8 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { this.logIdent = "[Kafka Metadata Cache on broker %d] ".format(brokerId) - def getTopicMetadata(topics: Set[String]) = { + def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol) = { + val isAllTopics = topics.isEmpty val topicsRequested = if(isAllTopics) cache.keySet else topics val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata] @@ -50,18 +54,21 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { val partitionMetadata = partitionStateInfos.map { case (partitionId, partitionState) => val replicas = partitionState.allReplicas - val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq - var leaderInfo: Option[Broker] = None - var isrInfo: Seq[Broker] = Nil + val replicaInfo: Seq[BrokerEndpoint] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq.map(_.getBrokerEndPoint(protocol)) + var leaderInfo: Option[BrokerEndpoint] = None + var leaderBrokerInfo: Option[Broker] = None + var isrInfo: Seq[BrokerEndpoint] = Nil val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch val leader = leaderIsrAndEpoch.leaderAndIsr.leader val isr = leaderIsrAndEpoch.leaderAndIsr.isr val topicPartition = TopicAndPartition(topic, partitionId) try { - leaderInfo = aliveBrokers.get(leader) - if (!leaderInfo.isDefined) + leaderBrokerInfo = aliveBrokers.get(leader) + if (!leaderBrokerInfo.isDefined) throw new LeaderNotAvailableException("Leader not available for %s".format(topicPartition)) - isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null) + else + leaderInfo = Some(leaderBrokerInfo.get.getBrokerEndPoint(protocol)) + isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).map(_.getBrokerEndPoint(protocol)) if (replicaInfo.size < replicas.size) throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala index 351dbba..f0a2a5b 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala @@ -17,13 +17,13 @@ package kafka.server -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager) extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, "Replica", brokerConfig.numReplicaFetchers) { - override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { + override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndpoint): AbstractFetcherThread = { new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr) } http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 96faa7b..b2196c8 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -18,14 +18,14 @@ package kafka.server import kafka.admin.AdminUtils -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet import kafka.api.{OffsetRequest, FetchResponsePartitionData} import kafka.common.{KafkaStorageException, TopicAndPartition} class ReplicaFetcherThread(name:String, - sourceBroker: Broker, + sourceBroker: BrokerEndpoint, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager) extends AbstractFetcherThread(name = name, http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6e43622..144a15e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -19,7 +19,7 @@ package kafka.server import kafka.api._ import kafka.common._ import kafka.utils._ -import kafka.cluster.{Broker, Partition, Replica} +import kafka.cluster.{BrokerEndpoint, Partition, Replica} import kafka.log.{LogAppendInfo, LogManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController @@ -684,7 +684,7 @@ class ReplicaManager(val config: KafkaConfig, * the error message will be set on each partition since we do not know which partition caused it */ private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], - leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short], + leaders: Set[BrokerEndpoint], correlationId: Int, responseMap: mutable.Map[(String, Int), Short], offsetManager: OffsetManager) { partitionState.foreach { state => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index d1e7c43..d2bac85 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -24,11 +24,11 @@ import kafka.utils._ import kafka.consumer.SimpleConsumer import kafka.api.{OffsetFetchResponse, OffsetFetchRequest, OffsetRequest} import kafka.common.{OffsetMetadataAndError, ErrorMapping, BrokerNotAvailableException, TopicAndPartition} +import org.apache.kafka.common.protocol.SecurityProtocol import scala.collection._ import kafka.client.ClientUtils import kafka.network.BlockingChannel import kafka.api.PartitionOffsetRequestInfo -import scala.Some import org.I0Itec.zkclient.exception.ZkNoNodeException object ConsumerOffsetChecker extends Logging { http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index ba6ddd7..d1050b4 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -18,7 +18,7 @@ package kafka.tools import joptsimple.OptionParser -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet} import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference @@ -197,7 +197,7 @@ private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, c private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int], leadersPerBroker: Map[Int, Seq[TopicAndPartition]], expectedNumFetchers: Int, - brokerMap: Map[Int, Broker], + brokerMap: Map[Int, BrokerEndpoint], initialOffsetTime: Long, reportInterval: Long) extends Logging { private val fetchOffsetMap = new Pool[TopicAndPartition, Long] @@ -335,7 +335,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } } -private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartitions: Iterable[TopicAndPartition], +private class ReplicaFetcher(name: String, sourceBroker: BrokerEndpoint, topicAndPartitions: Iterable[TopicAndPartition], replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int, fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean) extends ShutdownableThread(name) { http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index b4f903b..7379fe3 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -22,7 +22,7 @@ import kafka.utils._ import kafka.consumer._ import kafka.client.ClientUtils import kafka.api.{OffsetRequest, FetchRequestBuilder, Request} -import kafka.cluster.Broker +import kafka.cluster.BrokerEndpoint import scala.collection.JavaConversions._ import kafka.common.TopicAndPartition @@ -142,8 +142,8 @@ object SimpleConsumerShell extends Logging { } // validating replica id and initializing target broker - var fetchTargetBroker: Broker = null - var replicaOpt: Option[Broker] = null + var fetchTargetBroker: BrokerEndpoint = null + var replicaOpt: Option[BrokerEndpoint] = null if(replicaId == UseLeaderReplica) { replicaOpt = partitionMetadataOpt.get.leader if(!replicaOpt.isDefined) { @@ -167,7 +167,9 @@ object SimpleConsumerShell extends Logging { System.exit(1) } if (startingOffset < 0) { - val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout, + val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, + fetchTargetBroker.port, + ConsumerConfig.SocketTimeout, ConsumerConfig.SocketBufferSize, clientId) try { startingOffset = simpleConsumer.earliestOrLatestOffset(TopicAndPartition(topic, partitionId), startingOffset, @@ -188,8 +190,12 @@ object SimpleConsumerShell extends Logging { val replicaString = if(replicaId > 0) "leader" else "replica" info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]" - .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset)) - val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId) + .format(topic, partitionId, replicaString, replicaId, + fetchTargetBroker.host, + fetchTargetBroker.port, startingOffset)) + val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, + fetchTargetBroker.port, + 10000, 64*1024, clientId) val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() { def run() { var offset = startingOffset http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala index 111c9a8..e82cb81 100644 --- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala +++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala @@ -22,6 +22,7 @@ import kafka.consumer.{SimpleConsumer, ConsumerConfig} import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} import kafka.common.{TopicAndPartition, KafkaException} import kafka.utils.{ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, Utils} +import org.apache.kafka.common.protocol.SecurityProtocol /** @@ -65,7 +66,9 @@ object UpdateOffsetsInZK { ZkUtils.getBrokerInfo(zkClient, broker) match { case Some(brokerInfo) => - val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024, "UpdateOffsetsInZk") + val consumer = new SimpleConsumer(brokerInfo.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host, + brokerInfo.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port, + 10000, 100 * 1024, "UpdateOffsetsInZk") val topicAndPartition = TopicAndPartition(topic, partition) val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1))) val offset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/utils/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 894a6a6..afc3b4e 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -24,9 +24,13 @@ import java.nio.channels._ import java.util.concurrent.locks.{ReadWriteLock, Lock} import java.lang.management._ import javax.management._ + +import org.apache.kafka.common.protocol.SecurityProtocol + import scala.collection._ import scala.collection.mutable import java.util.Properties +import kafka.cluster.EndPoint import kafka.common.KafkaException import kafka.common.KafkaStorageException @@ -607,4 +611,9 @@ object Utils extends Logging { .filter{ case (k,l) => (l > 1) } .keys } + + def listenerListToEndPoints(listeners: String): immutable.Map[SecurityProtocol, EndPoint] = { + val listenerList = parseCsvList(listeners) + listenerList.map(listener => EndPoint.createEndPoint(listener)).map(ep => ep.protocolType -> ep).toMap + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 7ae999e..82b0e33 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -17,13 +17,14 @@ package kafka.utils -import kafka.cluster.{Broker, Cluster} +import kafka.cluster._ import kafka.consumer.{ConsumerThreadId, TopicCount} import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError, ZkBadVersionException} import org.I0Itec.zkclient.serialize.ZkSerializer import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.common.protocol.SecurityProtocol import collection._ import kafka.api.LeaderAndIsr import org.apache.zookeeper.data.Stat @@ -31,10 +32,8 @@ import kafka.admin._ import kafka.common.{KafkaException, NoEpochForPartitionException} import kafka.controller.ReassignedPartitionsContext import kafka.controller.KafkaController -import scala.Some import kafka.controller.LeaderIsrAndControllerEpoch import kafka.common.TopicAndPartition -import scala.collection object ZkUtils extends Logging { val ConsumersPath = "/consumers" @@ -84,6 +83,10 @@ object ZkUtils extends Logging { brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) } + def getAllBrokerEndPointsForChannel(zkClient: ZkClient, protocolType: SecurityProtocol): Seq[BrokerEndpoint] = { + getAllBrokersInCluster(zkClient).map(_.getBrokerEndPoint(protocolType)) + } + def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr) } @@ -169,12 +172,28 @@ object ZkUtils extends Logging { } } - def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) { + /** + * Register brokers with v2 json format (which includes multiple endpoints). + * This format also includes default endpoints for compatibility with older clients. + * @param zkClient + * @param id + * @param advertisedEndpoints + * @param timeout + * @param jmxPort + */ + def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, advertisedEndpoints: immutable.Map[SecurityProtocol, EndPoint], timeout: Int, jmxPort: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val timestamp = SystemTime.milliseconds.toString - val brokerInfo = Json.encode(Map("version" -> 1, "host" -> host, "port" -> port, "jmx_port" -> jmxPort, "timestamp" -> timestamp)) - val expectedBroker = new Broker(id, host, port) + val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port, "endpoints"->advertisedEndpoints.values.map(_.connectionString).toArray, "jmx_port" -> jmxPort, "timestamp" -> timestamp)) + val expectedBroker = new Broker(id, advertisedEndpoints) + + registerBrokerInZk(zkClient, brokerIdPath, brokerInfo, expectedBroker, timeout) + + info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(","))) + } + + private def registerBrokerInZk(zkClient: ZkClient, brokerIdPath: String, brokerInfo: String, expectedBroker: Broker, timeout: Int) { try { createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, expectedBroker, (brokerString: String, broker: Any) => Broker.createBroker(broker.asInstanceOf[Broker].id, brokerString).equals(broker.asInstanceOf[Broker]), @@ -183,11 +202,10 @@ object ZkUtils extends Logging { } catch { case e: ZkNodeExistsException => throw new RuntimeException("A broker is already registered on the path " + brokerIdPath - + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " - + "else you have shutdown this broker and restarted it faster than the zookeeper " - + "timeout so it appears to be re-registering.") + + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + + "else you have shutdown this broker and restarted it faster than the zookeeper " + + "timeout so it appears to be re-registering.") } - info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port)) } def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index aba256d..9811a2b 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -17,8 +17,6 @@ package kafka.api -import java.lang.{Integer, IllegalArgumentException} - import org.apache.kafka.clients.producer._ import org.scalatest.junit.JUnit3Suite import org.junit.Test http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/other/kafka/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index a106379..9881bd3 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -3,6 +3,7 @@ package other.kafka import org.I0Itec.zkclient.ZkClient import kafka.api._ import kafka.utils.{ShutdownableThread, ZKStringSerializer} +import org.apache.kafka.common.protocol.SecurityProtocol import scala.collection._ import kafka.client.ClientUtils import joptsimple.OptionParser @@ -12,7 +13,6 @@ import scala.util.Random import java.io.IOException import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import java.util.concurrent.TimeUnit -import com.yammer.metrics.core.Gauge import java.util.concurrent.atomic.AtomicInteger import java.nio.channels.ClosedByInterruptException http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala index 4d36b8b..bc4aef3 100644 --- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala @@ -65,14 +65,14 @@ class KafkaTest { assertEquals(2, config2.brokerId) // We should be also able to set completely new property - val config3 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "port=1987")) + val config3 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact")) assertEquals(1, config3.brokerId) - assertEquals(1987, config3.port) + assertEquals("compact", config3.logCleanupPolicy) // We should be also able to set several properties - val config4 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "port=1987", "--override", "broker.id=2")) + val config4 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact", "--override", "broker.id=2")) assertEquals(2, config4.brokerId) - assertEquals(1987, config4.port) + assertEquals("compact", config4.logCleanupPolicy) } @Test(expected = classOf[ExitCalled]) http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 99ac923..76b8b24 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -17,6 +17,7 @@ package kafka.admin +import org.apache.kafka.common.protocol.SecurityProtocol import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ @@ -92,7 +93,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { // read metadata from a broker and verify the new topic partitions exist TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1) TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions", + val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testIncrementPartitions", 2000,0).topicsMetadata val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1)) val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata @@ -117,7 +118,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { // read metadata from a broker and verify the new topic partitions exist TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1) TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas", + val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testManualAssignmentOfReplicas", 2000,0).topicsMetadata val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2)) val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata @@ -141,7 +142,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5) TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement", + val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testReplicaPlacement", 2000,0).topicsMetadata val metaDataForTopic3 = metadata.filter(p => p.topic.equals(topic3)).head http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 030faac..83910f3 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -17,18 +17,19 @@ package kafka.api + +import kafka.cluster.{BrokerEndpoint, EndPoint, Broker} +import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError} import kafka.common._ -import kafka.cluster.Broker -import kafka.controller.LeaderIsrAndControllerEpoch import kafka.message.{Message, ByteBufferMessageSet} import kafka.utils.SystemTime -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.requests._ +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.common.TopicAndPartition -import scala.Some import java.nio.ByteBuffer +import org.apache.kafka.common.protocol.SecurityProtocol import org.junit._ import org.scalatest.junit.JUnitSuite import junit.framework.Assert._ @@ -81,21 +82,47 @@ object SerializationTestUtils { TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100) ) - private val brokers = List(new Broker(0, "localhost", 1011), new Broker(1, "localhost", 1012), new Broker(2, "localhost", 1013)) - private val partitionMetaData0 = new PartitionMetadata(0, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 0) - private val partitionMetaData1 = new PartitionMetadata(1, Some(brokers.head), replicas = brokers, isr = brokers.tail, errorCode = 1) - private val partitionMetaData2 = new PartitionMetadata(2, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 2) - private val partitionMetaData3 = new PartitionMetadata(3, Some(brokers.head), replicas = brokers, isr = brokers.tail.tail, errorCode = 3) + private val brokers = List(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1011, SecurityProtocol.PLAINTEXT))), + new Broker(1, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1012, SecurityProtocol.PLAINTEXT))), + new Broker(2, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1013, SecurityProtocol.PLAINTEXT)))) + private val brokerEndpoints = brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)) + + private val partitionMetaData0 = new PartitionMetadata(0, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, errorCode = 0) + private val partitionMetaData1 = new PartitionMetadata(1, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints.tail, errorCode = 1) + private val partitionMetaData2 = new PartitionMetadata(2, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints, errorCode = 2) + private val partitionMetaData3 = new PartitionMetadata(3, Some(brokerEndpoints.head), replicas = brokerEndpoints, isr = brokerEndpoints.tail.tail, errorCode = 3) private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3) private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq) private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq) + private val leaderAndIsr0 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.map(_.id)) + private val leaderAndIsr1 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.tail.map(_.id)) + private val leaderAndIsr2 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.map(_.id)) + private val leaderAndIsr3 = new LeaderAndIsr(leader = brokers.head.id, isr = brokers.tail.map(_.id)) + + private val leaderIsrAndControllerEpoch0 = new LeaderIsrAndControllerEpoch(leaderAndIsr0, controllerEpoch = 0) + private val leaderIsrAndControllerEpoch1 = new LeaderIsrAndControllerEpoch(leaderAndIsr1, controllerEpoch = 0) + private val leaderIsrAndControllerEpoch2 = new LeaderIsrAndControllerEpoch(leaderAndIsr2, controllerEpoch = 0) + private val leaderIsrAndControllerEpoch3 = new LeaderIsrAndControllerEpoch(leaderAndIsr3, controllerEpoch = 0) + + private val partitionStateInfo0 = new PartitionStateInfo(leaderIsrAndControllerEpoch0, brokers.map(_.id).toSet) + private val partitionStateInfo1 = new PartitionStateInfo(leaderIsrAndControllerEpoch1, brokers.map(_.id).toSet) + private val partitionStateInfo2 = new PartitionStateInfo(leaderIsrAndControllerEpoch2, brokers.map(_.id).toSet) + private val partitionStateInfo3 = new PartitionStateInfo(leaderIsrAndControllerEpoch3, brokers.map(_.id).toSet) + + private val updateMetadataRequestPartitionStateInfo = collection.immutable.Map( + TopicAndPartition(topic1,0) -> partitionStateInfo0, + TopicAndPartition(topic1,1) -> partitionStateInfo1, + TopicAndPartition(topic1,2) -> partitionStateInfo2, + TopicAndPartition(topic1,3) -> partitionStateInfo3 + ) + def createTestLeaderAndIsrRequest() : LeaderAndIsrRequest = { val leaderAndIsr1 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader1, 1, isr1, 1), 1) val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1) val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, isr1.toSet)), ((topic2, 0), PartitionStateInfo(leaderAndIsr2, isr2.toSet))) - new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1, 0, "") + new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[BrokerEndpoint](), 0, 1, 0, "") } def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = { @@ -149,7 +176,7 @@ object SerializationTestUtils { } def createTestTopicMetadataResponse: TopicMetadataResponse = { - new TopicMetadataResponse(brokers, Seq(topicmetaData1, topicmetaData2), 1) + new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)).toVector, Seq(topicmetaData1, topicmetaData2), 1) } def createTestOffsetCommitRequestV2: OffsetCommitRequest = { @@ -206,7 +233,23 @@ object SerializationTestUtils { } def createConsumerMetadataResponse: ConsumerMetadataResponse = { - ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError, 0) + ConsumerMetadataResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0) + } + + def createUpdateMetadataRequest(versionId: Short): UpdateMetadataRequest = { + UpdateMetadataRequest( + versionId, + correlationId = 0, + clientId = "client1", + controllerId = 0, + controllerEpoch = 0, + partitionStateInfos = updateMetadataRequestPartitionStateInfo, + brokers.toSet + ) + } + + def createUpdateMetadataResponse: UpdateMetadataResponse = { + UpdateMetadataResponse( correlationId = 0, errorCode = 0) } } @@ -231,6 +274,10 @@ class RequestResponseSerializationTest extends JUnitSuite { private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0) + private val updateMetadataRequestV0 = SerializationTestUtils.createUpdateMetadataRequest(0) + private val updateMetadataRequestV1 = SerializationTestUtils.createUpdateMetadataRequest(1) + private val updateMetdataResponse = SerializationTestUtils.createUpdateMetadataResponse + @Test def testSerializationAndDeserialization() { @@ -243,6 +290,7 @@ class RequestResponseSerializationTest extends JUnitSuite { offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2, offsetCommitResponse, offsetFetchRequest, offsetFetchResponse, consumerMetadataRequest, consumerMetadataResponse, + updateMetadataRequestV0, updateMetadataRequestV1, updateMetdataResponse, consumerMetadataResponseNoCoordinator) requestsAndResponses.foreach { original => http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala new file mode 100644 index 0000000..ad58eed --- /dev/null +++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.cluster + +import java.nio.ByteBuffer + +import kafka.utils.Logging +import org.apache.kafka.common.protocol.SecurityProtocol +import org.junit.Test +import org.scalatest.junit.JUnit3Suite + +import scala.collection.mutable + +class BrokerEndPointTest extends JUnit3Suite with Logging { + + @Test + def testSerDe() = { + + val endpoint = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) + val listEndPoints = Map(SecurityProtocol.PLAINTEXT -> endpoint) + val origBroker = new Broker(1, listEndPoints) + val brokerBytes = ByteBuffer.allocate(origBroker.sizeInBytes) + + origBroker.writeTo(brokerBytes) + + val newBroker = Broker.readFrom(brokerBytes.flip().asInstanceOf[ByteBuffer]) + assert(origBroker == newBroker) + } + + @Test + def testHashAndEquals() = { + val endpoint1 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) + val endpoint2 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) + val endpoint3 = new EndPoint("myhost", 1111, SecurityProtocol.PLAINTEXT) + val endpoint4 = new EndPoint("other", 1111, SecurityProtocol.PLAINTEXT) + val broker1 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint1)) + val broker2 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint2)) + val broker3 = new Broker(2, Map(SecurityProtocol.PLAINTEXT -> endpoint3)) + val broker4 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint4)) + + assert(broker1 == broker2) + assert(broker1 != broker3) + assert(broker1 != broker4) + assert(broker1.hashCode() == broker2.hashCode()) + assert(broker1.hashCode() != broker3.hashCode()) + assert(broker1.hashCode() != broker4.hashCode()) + + val hashmap = new mutable.HashMap[Broker, Int]() + hashmap.put(broker1, 1) + assert(hashmap.getOrElse(broker1, -1) == 1) + } + + @Test + def testFromJSON() = { + val brokerInfoStr = "{\"version\":2," + + "\"host\":\"localhost\"," + + "\"port\":9092," + + "\"jmx_port\":9999," + + "\"timestamp\":\"1416974968782\"," + + "\"endpoints\":[\"PLAINTEXT://localhost:9092\"]}" + val broker = Broker.createBroker(1, brokerInfoStr) + assert(broker.id == 1) + assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host == "localhost") + assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port == 9092) + } + + @Test + def testFromOldJSON() = { + val brokerInfoStr = "{\"jmx_port\":-1,\"timestamp\":\"1420485325400\",\"host\":\"172.16.8.243\",\"version\":1,\"port\":9091}" + val broker = Broker.createBroker(1, brokerInfoStr) + assert(broker.id == 1) + assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host == "172.16.8.243") + assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port == 9091) + } + + @Test + def testBrokerEndpointFromURI() = { + var connectionString = "localhost:9092" + var endpoint = BrokerEndpoint.createBrokerEndPoint(1, connectionString) + assert(endpoint.host == "localhost") + assert(endpoint.port == 9092) + // also test for ipv6 + connectionString = "[::1]:9092" + endpoint = BrokerEndpoint.createBrokerEndPoint(1, connectionString) + assert(endpoint.host == "::1") + assert(endpoint.port == 9092) + } + + @Test + def testEndpointFromURI() = { + var connectionString = "PLAINTEXT://localhost:9092" + var endpoint = EndPoint.createEndPoint(connectionString) + assert(endpoint.host == "localhost") + assert(endpoint.port == 9092) + assert(endpoint.connectionString == "PLAINTEXT://localhost:9092") + // also test for default bind + connectionString = "PLAINTEXT://:9092" + endpoint = EndPoint.createEndPoint(connectionString) + assert(endpoint.host == null) + assert(endpoint.port == 9092) + assert(endpoint.connectionString == "PLAINTEXT://:9092") + // also test for ipv6 + connectionString = "PLAINTEXT://[::1]:9092" + endpoint = EndPoint.createEndPoint(connectionString) + assert(endpoint.host == "::1") + assert(endpoint.port == 9092) + assert(endpoint.connectionString == "PLAINTEXT://[::1]:9092") + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/integration/FetcherTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index ecb5a33..0dc837a 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -28,7 +28,6 @@ import org.scalatest.junit.JUnit3Suite import kafka.consumer._ import kafka.serializer._ import kafka.producer.{KeyedMessage, Producer} -import kafka.utils.TestUtils._ import kafka.utils.TestUtils class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { @@ -37,14 +36,13 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { val messages = new mutable.HashMap[Int, Seq[Array[Byte]]] val topic = "topic" - val queue = new LinkedBlockingQueue[FetchedDataChunk] var fetcher: ConsumerFetcherManager = null override def setUp() { super.setUp - createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers) + TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers) val cluster = new Cluster(servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort()))) http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 28e3122..f95fb62 100644 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -18,9 +18,10 @@ package kafka.integration import java.util.Arrays + import scala.collection.mutable.Buffer import kafka.server._ -import kafka.utils.{Utils, TestUtils} +import kafka.utils.{TestUtils, Utils} import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.common.KafkaException @@ -47,8 +48,9 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { } def serverForId(id: Int) = servers.find(s => s.config.brokerId == id) - + def bootstrapUrl = servers.map(s => s.config.hostName + ":" + s.boundPort()).mkString(",") + override def setUp() { super.setUp http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 56b1b8c..603cf76 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -17,12 +17,13 @@ package kafka.integration +import org.apache.kafka.common.protocol.SecurityProtocol import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.admin.AdminUtils import java.nio.ByteBuffer import junit.framework.Assert._ -import kafka.cluster.Broker +import kafka.cluster.{BrokerEndpoint, Broker} import kafka.utils.TestUtils import kafka.utils.TestUtils._ import kafka.server.{KafkaServer, KafkaConfig} @@ -32,14 +33,14 @@ import kafka.client.ClientUtils class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { private var server1: KafkaServer = null - var brokers: Seq[Broker] = null + var brokerEndPoints: Seq[BrokerEndpoint] = null override def setUp() { super.setUp() val props = createBrokerConfigs(1, zkConnect) val configs = props.map(KafkaConfig.fromProps) server1 = TestUtils.createServer(configs.head) - brokers = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort())) + brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)) } override def tearDown() { @@ -68,7 +69,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val topic = "test" createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) - var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", + var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", 2000,0).topicsMetadata assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode) @@ -88,7 +89,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) // issue metadata request with empty list of topics - var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokers, "TopicMetadataTest-testGetAllTopicMetadata", + var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testGetAllTopicMetadata", 2000, 0).topicsMetadata assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) assertEquals(2, topicsMetadata.size) @@ -107,7 +108,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { def testAutoCreateTopic { // auto create topic val topic = "testAutoCreateTopic" - var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testAutoCreateTopic", + var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic", 2000,0).topicsMetadata assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode) assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size) @@ -119,7 +120,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0) // retry the metadata for the auto created topic - topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", + topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", 2000,0).topicsMetadata assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode) http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index ac4c5b9..bd9a409 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -32,7 +32,7 @@ class LogTest extends JUnitSuite { var logDir: File = null val time = new MockTime(0) var config: KafkaConfig = null - val logConfig = LogConfig() + val logConfig = LogConfig() @Before def setUp() { http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 79a806c..95d5621 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -19,6 +19,8 @@ package kafka.network; import java.net._ import java.io._ +import kafka.cluster.EndPoint +import org.apache.kafka.common.protocol.SecurityProtocol import org.junit._ import org.scalatest.junit.JUnitSuite import java.util.Random @@ -35,8 +37,8 @@ import scala.collection.Map class SocketServerTest extends JUnitSuite { val server: SocketServer = new SocketServer(0, - host = null, - port = 0, + Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, 0, SecurityProtocol.PLAINTEXT), + SecurityProtocol.TRACE -> EndPoint(null, 0, SecurityProtocol.TRACE)), numProcessorThreads = 1, maxQueuedRequests = 50, sendBufferSize = 300000, @@ -73,7 +75,10 @@ class SocketServerTest extends JUnitSuite { channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) } - def connect(s:SocketServer = server) = new Socket("localhost", s.boundPort) + def connect(s:SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = { + new Socket("localhost", server.boundPort(protocol)) + } + @After def cleanup() { @@ -81,7 +86,8 @@ class SocketServerTest extends JUnitSuite { } @Test def simpleRequest() { - val socket = connect() + val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) + val traceSocket = connect(protocol = SecurityProtocol.TRACE) val correlationId = -1 val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs @@ -95,9 +101,15 @@ class SocketServerTest extends JUnitSuite { val serializedBytes = new Array[Byte](byteBuffer.remaining) byteBuffer.get(serializedBytes) - sendRequest(socket, 0, serializedBytes) + // Test PLAINTEXT socket + sendRequest(plainSocket, 0, serializedBytes) + processRequest(server.requestChannel) + assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq) + + // Test TRACE socket + sendRequest(traceSocket, 0, serializedBytes) processRequest(server.requestChannel) - assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq) + assertEquals(serializedBytes.toSeq, receiveResponse(traceSocket).toSeq) } @Test(expected = classOf[IOException]) @@ -129,21 +141,38 @@ class SocketServerTest extends JUnitSuite { "Socket key should be available for reads") } - @Test(expected = classOf[IOException]) + @Test def testSocketsCloseOnShutdown() { // open a connection - val socket = connect() + val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) + val traceSocket = connect(protocol = SecurityProtocol.TRACE) val bytes = new Array[Byte](40) // send a request first to make sure the connection has been picked up by the socket server - sendRequest(socket, 0, bytes) + sendRequest(plainSocket, 0, bytes) + sendRequest(traceSocket, 0, bytes) processRequest(server.requestChannel) + + // make sure the sockets are open + server.acceptors.values.map(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed)) // then shutdown the server server.shutdown() val largeChunkOfBytes = new Array[Byte](1000000) // doing a subsequent send should throw an exception as the connection should be closed. // send a large chunk of bytes to trigger a socket flush - sendRequest(socket, 0, largeChunkOfBytes) + try { + sendRequest(plainSocket, 0, largeChunkOfBytes) + fail("expected exception when writing to closed plain socket") + } catch { + case e: IOException => // expected + } + + try { + sendRequest(traceSocket, 0, largeChunkOfBytes) + fail("expected exception when writing to closed trace socket") + } catch { + case e: IOException => // expected + } } @Test @@ -161,8 +190,7 @@ class SocketServerTest extends JUnitSuite { val overrideNum = 6 val overrides: Map[String, Int] = Map("localhost" -> overrideNum) val overrideServer: SocketServer = new SocketServer(0, - host = null, - port = 0, + Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, 0, SecurityProtocol.PLAINTEXT)), numProcessorThreads = 1, maxQueuedRequests = 50, sendBufferSize = 300000, http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index d2ab683..3b82fb3 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -23,7 +23,7 @@ import junit.framework.Assert._ import org.easymock.EasyMock import org.junit.Test import kafka.api._ -import kafka.cluster.Broker +import kafka.cluster.{BrokerEndpoint, Broker} import kafka.common._ import kafka.message._ import kafka.producer.async._ @@ -165,8 +165,8 @@ class AsyncProducerTest extends JUnit3Suite { val props = new Properties() props.put("metadata.broker.list", brokerList) - val broker1 = new Broker(0, "localhost", 9092) - val broker2 = new Broker(1, "localhost", 9093) + val broker1 = new BrokerEndpoint(0, "localhost", 9092) + val broker2 = new BrokerEndpoint(1, "localhost", 9093) // form expected partitions metadata val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2)) @@ -469,7 +469,7 @@ class AsyncProducerTest extends JUnit3Suite { } private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = { - val broker1 = new Broker(brokerId, brokerHost, brokerPort) + val broker1 = new BrokerEndpoint(brokerId, brokerHost, brokerPort) new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1)))) } http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 7d6f655..8c3fb7a 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -19,16 +19,18 @@ package kafka.producer import java.net.SocketTimeoutException import java.util.Properties + import junit.framework.Assert import kafka.admin.AdminUtils +import kafka.api.ProducerResponseStatus +import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.integration.KafkaServerTestHarness import kafka.message._ import kafka.server.KafkaConfig import kafka.utils._ +import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.Test import org.scalatest.junit.JUnit3Suite -import kafka.api.ProducerResponseStatus -import kafka.common.{TopicAndPartition, ErrorMapping} class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { private val messageBytes = new Array[Byte](2) @@ -38,7 +40,9 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testReachableServer() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) + + val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort()) + val producer = new SyncProducer(new SyncProducerConfig(props)) val firstStart = SystemTime.milliseconds @@ -73,7 +77,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testEmptyProduceRequest() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) + val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort()) + val correlationId = 0 val clientId = SyncProducerConfig.DefaultClientId @@ -90,7 +95,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testMessageSizeTooLarge() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) + val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort()) val producer = new SyncProducer(new SyncProducerConfig(props)) TestUtils.createTopic(zkClient, "test", numPartitions = 1, replicationFactor = 1, servers = servers) @@ -117,8 +122,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testMessageSizeTooLargeWithAckZero() { val server = servers.head + val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort()) - val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) props.put("request.required.acks", "0") val producer = new SyncProducer(new SyncProducerConfig(props)) @@ -144,7 +149,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testProduceCorrectlyReceivesResponse() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) + val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort()) val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) @@ -190,7 +195,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val timeoutMs = 500 val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) + val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort()) val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) @@ -216,7 +221,9 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testProduceRequestWithNoResponse() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) + + val port = server.socketServer.boundPort(SecurityProtocol.PLAINTEXT) + val props = TestUtils.getSyncProducerConfig(port) val correlationId = 0 val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs @@ -231,8 +238,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { def testNotEnoughReplicas() { val topicName = "minisrtest" val server = servers.head + val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort()) - val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort) props.put("request.required.acks", "-1") val producer = new SyncProducer(new SyncProducerConfig(props)) http://git-wip-us.apache.org/repos/asf/kafka/blob/53f31432/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala index b011240..48d3143 100644 --- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala @@ -17,10 +17,11 @@ package kafka.server -import org.scalatest.junit.JUnit3Suite -import kafka.zk.ZooKeeperTestHarness import junit.framework.Assert._ -import kafka.utils.{ZkUtils, Utils, TestUtils} +import kafka.utils.{TestUtils, Utils, ZkUtils} +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.protocol.SecurityProtocol +import org.scalatest.junit.JUnit3Suite class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness { var server : KafkaServer = null @@ -30,10 +31,11 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness { override def setUp() { super.setUp() + val props = TestUtils.createBrokerConfig(brokerId, zkConnect) props.put("advertised.host.name", advertisedHostName) props.put("advertised.port", advertisedPort.toString) - + server = TestUtils.createServer(KafkaConfig.fromProps(props)) } @@ -45,8 +47,9 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness { def testBrokerAdvertiseToZK { val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId) - assertEquals(advertisedHostName, brokerInfo.get.host) - assertEquals(advertisedPort, brokerInfo.get.port) + val endpoint = brokerInfo.get.endPoints.get(SecurityProtocol.PLAINTEXT).get + assertEquals(advertisedHostName, endpoint.host) + assertEquals(advertisedPort, endpoint.port) } } \ No newline at end of file