KAFKA-675 Allow the user to override the host that we bind to. Patch from Matan Amir<matan.a...@voxer.com> with slight changes to improve error messages for a bad host or port.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6d41102 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6d41102 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6d41102 Branch: refs/heads/trunk Commit: c6d41102d81ac48b345d3f42d669e6a6bbfbe062 Parents: 85ec044 Author: Jay Kreps <jay.kr...@gmail.com> Authored: Thu Dec 20 14:13:22 2012 -0800 Committer: Jay Kreps <jay.kr...@gmail.com> Committed: Thu Dec 20 14:13:22 2012 -0800 ---------------------------------------------------------------------- config/server.properties | 11 ++-- .../main/scala/kafka/network/SocketServer.scala | 40 +++++++++++--- core/src/main/scala/kafka/server/KafkaConfig.scala | 6 +- core/src/main/scala/kafka/server/KafkaServer.scala | 1 + .../main/scala/kafka/server/KafkaZooKeeper.scala | 8 ++- core/src/main/scala/kafka/utils/ZkUtils.scala | 2 +- .../unit/kafka/network/SocketServerTest.scala | 1 + 7 files changed, 48 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/config/server.properties ---------------------------------------------------------------------- diff --git a/config/server.properties b/config/server.properties index e92f599..f4521fb 100644 --- a/config/server.properties +++ b/config/server.properties @@ -19,17 +19,16 @@ # The id of the broker. This must be set to a unique integer for each broker. brokerid=0 -# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned -# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost -# may not be what you want. -#hostname= - - ############################# Socket Server Settings ############################# # The port the socket server listens on port=9092 +# Hostname the broker will bind to and advertise to producers and consumers. +# If not set, the server will bind to all interfaces and advertise the value returned from +# from java.net.InetAddress.getCanonicalHostName(). +#hostname=localhost + # The number of threads handling network requests network.threads=2 http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 9cdadd7..2102fbf 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -23,6 +23,7 @@ import java.net._ import java.io._ import java.nio.channels._ +import kafka.common.KafkaException import kafka.utils._ /** @@ -32,6 +33,7 @@ import kafka.utils._ * M Handler threads that handle requests and produce responses back to the processor threads for writing. */ class SocketServer(val brokerId: Int, + val host: String, val port: Int, val numProcessorThreads: Int, val maxQueuedRequests: Int, @@ -39,7 +41,7 @@ class SocketServer(val brokerId: Int, this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) - private var acceptor: Acceptor = new Acceptor(port, processors) + @volatile private var acceptor: Acceptor = null val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests) /** @@ -54,6 +56,7 @@ class SocketServer(val brokerId: Int, requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) // start accepting connections + this.acceptor = new Acceptor(host, port, processors) Utils.newThread("kafka-acceptor", acceptor, false).start() acceptor.awaitStartup info("started") @@ -64,10 +67,11 @@ class SocketServer(val brokerId: Int, */ def shutdown() = { info("shutting down") - acceptor.shutdown + if(acceptor != null) + acceptor.shutdown() for(processor <- processors) - processor.shutdown - info("shutted down completely") + processor.shutdown() + info("shut down completely") } } @@ -123,17 +127,14 @@ private[kafka] abstract class AbstractServerThread extends Runnable with Logging /** * Thread that accepts and configures new connections. There is only need for one of these */ -private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor]) extends AbstractServerThread { +private[kafka] class Acceptor(val host: String, val port: Int, private val processors: Array[Processor]) extends AbstractServerThread { + val serverChannel = openServerSocket(host, port) /** * Accept loop that checks for new connection attempts */ def run() { - val serverChannel = ServerSocketChannel.open() - serverChannel.configureBlocking(false) - serverChannel.socket.bind(new InetSocketAddress(port)) serverChannel.register(selector, SelectionKey.OP_ACCEPT); - info("Awaiting connections on port " + port) startupComplete() var currentProcessor = 0 while(isRunning) { @@ -164,6 +165,27 @@ private[kafka] class Acceptor(val port: Int, private val processors: Array[Proce swallowError(selector.close()) shutdownComplete() } + + /* + * Create a server socket to listen for connections on. + */ + def openServerSocket(host: String, port: Int): ServerSocketChannel = { + val socketAddress = + if(host == null || host.trim.isEmpty) + new InetSocketAddress(port) + else + new InetSocketAddress(host, port) + val serverChannel = ServerSocketChannel.open() + serverChannel.configureBlocking(false) + try { + serverChannel.socket.bind(socketAddress) + info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, port)) + } catch { + case e: SocketException => + throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e) + } + serverChannel + } /* * Accept a new connection http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 5754676..962b65f 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -20,7 +20,6 @@ package kafka.server import java.util.Properties import kafka.message.Message import kafka.consumer.ConsumerConfig -import java.net.InetAddress import kafka.utils.{VerifiableProperties, ZKConfig, Utils} /** @@ -56,8 +55,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the port to listen and accept connections on */ val port: Int = props.getInt("port", 6667) - /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */ - val hostName: String = props.getString("hostname", InetAddress.getLocalHost.getHostAddress) + /* hostname of broker. If this is set, it will only bind to this address. If this is not set, + * it will bind to all interfaces, and publish one to ZK */ + val hostName: String = props.getString("hostname", null) /* the SO_SNDBUFF buffer of the socket sever sockets */ val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024) http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index d444d22..ae35e4f 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -62,6 +62,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg logManager.startup() socketServer = new SocketServer(config.brokerId, + config.hostName, config.port, config.numNetworkThreads, config.numQueuedRequests, http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/main/scala/kafka/server/KafkaZooKeeper.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala index e1c11f2..42f8239 100644 --- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala +++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala @@ -21,6 +21,7 @@ import kafka.utils._ import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkStateListener, ZkClient} import kafka.common._ +import java.net.InetAddress /** @@ -41,8 +42,11 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging { } private def registerBrokerInZk() { - info("Registering broker " + brokerIdPath) - val hostName = config.hostName + val hostName = + if(config.hostName == null || config.hostName.trim.isEmpty) + InetAddress.getLocalHost.getCanonicalHostName + else + config.hostName ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port) } http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 358c4fd..eabedd0 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -189,7 +189,7 @@ object ZkUtils extends Logging { case e: ZkNodeExistsException => throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.") } - info("Registering broker " + brokerIdPath + " succeeded with " + broker) + info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port)) } def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 3b5ec7f..7395cbc 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -34,6 +34,7 @@ import kafka.message.ByteBufferMessageSet class SocketServerTest extends JUnitSuite { val server: SocketServer = new SocketServer(0, + host = null, port = TestUtils.choosePort, numProcessorThreads = 1, maxQueuedRequests = 50,