Updated Branches: refs/heads/0.8 1976ce806 -> ede85875a
KAFKA-872; Socket server should set send/recv socket buffer sizes; reviewed by Jun Rao. Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ede85875 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ede85875 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ede85875 Branch: refs/heads/0.8 Commit: ede85875a3aea5723baf82736cc71bb1a04a6102 Parents: 1976ce8 Author: Joel Koshy <jjko...@gmail.com> Authored: Tue Apr 23 10:40:47 2013 -0700 Committer: Joel Koshy <jjko...@gmail.com> Committed: Tue Apr 23 10:40:47 2013 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/network/SocketServer.scala | 20 ++++++++++++--- core/src/main/scala/kafka/server/KafkaServer.scala | 2 + .../unit/kafka/network/SocketServerTest.scala | 2 + 3 files changed, 20 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ede85875/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 648d936..5a44c28 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -37,6 +37,8 @@ class SocketServer(val brokerId: Int, val port: Int, val numProcessorThreads: Int, val maxQueuedRequests: Int, + val sendBufferSize: Int, + val recvBufferSize: Int, val maxRequestSize: Int = Int.MaxValue) extends Logging { this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime @@ -56,7 +58,7 @@ class SocketServer(val brokerId: Int, requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) // start accepting connections - this.acceptor = new Acceptor(host, port, processors) + this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize) Utils.newThread("kafka-acceptor", acceptor, false).start() acceptor.awaitStartup info("started") @@ -128,7 +130,8 @@ private[kafka] abstract class AbstractServerThread extends Runnable with Logging /** * Thread that accepts and configures new connections. There is only need for one of these */ -private[kafka] class Acceptor(val host: String, val port: Int, private val processors: Array[Processor]) extends AbstractServerThread { +private[kafka] class Acceptor(val host: String, val port: Int, private val processors: Array[Processor], + val sendBufferSize: Int, val recvBufferSize: Int) extends AbstractServerThread { val serverChannel = openServerSocket(host, port) /** @@ -192,10 +195,19 @@ private[kafka] class Acceptor(val host: String, val port: Int, private val proce * Accept a new connection */ def accept(key: SelectionKey, processor: Processor) { - val socketChannel = key.channel().asInstanceOf[ServerSocketChannel].accept() - debug("Accepted connection from " + socketChannel.socket.getInetAddress() + " on " + socketChannel.socket.getLocalSocketAddress) + val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] + serverSocketChannel.socket().setReceiveBufferSize(recvBufferSize) + + val socketChannel = serverSocketChannel.accept() socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) + socketChannel.socket().setSendBufferSize(sendBufferSize) + + debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]" + .format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress, + socketChannel.socket.getSendBufferSize, sendBufferSize, + socketChannel.socket.getReceiveBufferSize, recvBufferSize)) + processor.accept(socketChannel) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ede85875/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 7298ccb..b4a57c6 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -66,6 +66,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.port, config.numNetworkThreads, config.queuedMaxRequests, + config.socketSendBufferBytes, + config.socketReceiveBufferBytes, config.socketRequestMaxBytes) socketServer.startup http://git-wip-us.apache.org/repos/asf/kafka/blob/ede85875/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index b347e66..94b5a2a 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -38,6 +38,8 @@ class SocketServerTest extends JUnitSuite { port = kafka.utils.TestUtils.choosePort, numProcessorThreads = 1, maxQueuedRequests = 50, + sendBufferSize = 300000, + recvBufferSize = 300000, maxRequestSize = 50) server.startup()