Updated Branches: refs/heads/trunk bf4dbd5ee -> a55ec0620
kafka-1092; Add server config parameter to separate bind address and ZK hostname; patched by Roger Hoover; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a55ec062 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a55ec062 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a55ec062 Branch: refs/heads/trunk Commit: a55ec0620f6ce805fafe2e1d4035ec3e0ab4e0d0 Parents: bf4dbd5 Author: Roger Hoover <[email protected]> Authored: Wed Oct 30 21:06:23 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Wed Oct 30 21:06:23 2013 -0700 ---------------------------------------------------------------------- config/server.properties | 13 +++-- .../main/scala/kafka/server/KafkaConfig.scala | 13 ++++- .../scala/kafka/server/KafkaHealthcheck.scala | 14 +++--- .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../unit/kafka/producer/ProducerTest.scala | 12 ++--- .../unit/kafka/server/AdvertiseBrokerTest.scala | 52 ++++++++++++++++++++ .../unit/kafka/server/KafkaConfigTest.scala | 34 ++++++++++++- 7 files changed, 118 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/config/server.properties ---------------------------------------------------------------------- diff --git a/config/server.properties b/config/server.properties index 2eccc5e..8efa83f 100644 --- a/config/server.properties +++ b/config/server.properties @@ -24,11 +24,18 @@ broker.id=0 # The port the socket server listens on port=9092 -# Hostname the broker will bind to and advertise to producers and consumers. -# If not set, the server will bind to all interfaces and advertise the value returned from -# from java.net.InetAddress.getCanonicalHostName(). +# Hostname the broker will bind to. If not set, the server will bind to all interfaces #host.name=localhost +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name=<hostname routable by clients> + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port=<port accessible by clients> + # The number of threads handling network requests num.network.threads=2 http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 74442b6..b324344 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -69,8 +69,19 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val port: Int = props.getInt("port", 6667) /* hostname of broker. If this is set, it will only bind to this address. If this is not set, - * it will bind to all interfaces, and publish one to ZK */ + * it will bind to all interfaces */ val hostName: String = props.getString("host.name", null) + + /* hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may + * need to be different from the interface to which the broker binds. If this is not set, + * it will use the value for "host.name" if configured. Otherwise + * it will use the value returned from java.net.InetAddress.getCanonicalHostName(). */ + val advertisedHostName: String = props.getString("advertised.host.name", hostName) + + /* the port to publish to ZooKeeper for clients to use. In IaaS environments, this may + * need to be different from the port to which the broker binds. If this is not set, + * it will publish the same port that the broker binds to. */ + val advertisedPort: Int = props.getInt("advertised.port", port) /* the SO_SNDBUFF buffer of the socket sever sockets */ val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024) http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/main/scala/kafka/server/KafkaHealthcheck.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 84ea17a..9dca55c 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -27,14 +27,14 @@ import java.net.InetAddress /** * This class registers the broker in zookeeper to allow * other brokers and consumers to detect failures. It uses an ephemeral znode with the path: - * /brokers/[0...N] --> host:port + * /brokers/[0...N] --> advertisedHost:advertisedPort * * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise * we are dead. */ class KafkaHealthcheck(private val brokerId: Int, - private val host: String, - private val port: Int, + private val advertisedHost: String, + private val advertisedPort: Int, private val zkSessionTimeoutMs: Int, private val zkClient: ZkClient) extends Logging { @@ -49,13 +49,13 @@ class KafkaHealthcheck(private val brokerId: Int, * Register this broker as "alive" in zookeeper */ def register() { - val hostName = - if(host == null || host.trim.isEmpty) + val advertisedHostName = + if(advertisedHost == null || advertisedHost.trim.isEmpty) InetAddress.getLocalHost.getCanonicalHostName else - host + advertisedHost val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt - ZkUtils.registerBrokerInZk(zkClient, brokerId, hostName, port, zkSessionTimeoutMs, jmxPort) + ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, zkSessionTimeoutMs, jmxPort) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 5e35a89..5e34f95 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -99,7 +99,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg topicConfigManager.startup() /* tell everyone we are alive */ - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.hostName, config.port, config.zkSessionTimeoutMs, zkClient) + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/test/scala/unit/kafka/producer/ProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 2fb059b..4b2e4ad 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -49,15 +49,11 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ private var servers = List.empty[KafkaServer] private val props1 = TestUtils.createBrokerConfig(brokerId1, port1) - private val config1 = new KafkaConfig(props1) { - override val hostName = "localhost" - override val numPartitions = 4 - } + props1.put("num.partitions", "4") + private val config1 = new KafkaConfig(props1) private val props2 = TestUtils.createBrokerConfig(brokerId2, port2) - private val config2 = new KafkaConfig(props2) { - override val hostName = "localhost" - override val numPartitions = 4 - } + props2.put("num.partitions", "4") + private val config2 = new KafkaConfig(props2) override def setUp() { super.setUp() http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala new file mode 100644 index 0000000..f0c4a56 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.scalatest.junit.JUnit3Suite +import kafka.zk.ZooKeeperTestHarness +import junit.framework.Assert._ +import kafka.utils.{ZkUtils, Utils, TestUtils} + +class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness { + var server : KafkaServer = null + val brokerId = 0 + val advertisedHostName = "routable-host" + val advertisedPort = 1234 + + override def setUp() { + super.setUp() + val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort()) + props.put("advertised.host.name", advertisedHostName) + props.put("advertised.port", advertisedPort.toString) + + server = TestUtils.createServer(new KafkaConfig(props)) + } + + override def tearDown() { + server.shutdown() + Utils.rm(server.config.logDirs) + super.tearDown() + } + + def testBrokerAdvertiseToZK { + val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId) + assertEquals(advertisedHostName, brokerInfo.get.host) + assertEquals(advertisedPort, brokerInfo.get.port) + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/a55ec062/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 2f75e1d..89c207a 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -64,4 +64,34 @@ class KafkaConfigTest extends JUnit3Suite { } -} \ No newline at end of file + @Test + def testAdvertiseDefaults() { + val port = 9999 + val hostName = "fake-host" + + val props = TestUtils.createBrokerConfig(0, port) + props.put("host.name", hostName) + + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.advertisedHostName, hostName) + assertEquals(serverConfig.advertisedPort, port) + } + + @Test + def testAdvertiseConfigured() { + val port = 9999 + val advertisedHostName = "routable-host" + val advertisedPort = 1234 + + val props = TestUtils.createBrokerConfig(0, port) + props.put("advertised.host.name", advertisedHostName) + props.put("advertised.port", advertisedPort.toString) + + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.advertisedHostName, advertisedHostName) + assertEquals(serverConfig.advertisedPort, advertisedPort) + } + +}
