Updated Branches:
  refs/heads/0.8 1976ce806 -> ede85875a

KAFKA-872; Socket server should set send/recv socket buffer sizes; reviewed by 
Jun Rao.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ede85875
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ede85875
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ede85875

Branch: refs/heads/0.8
Commit: ede85875a3aea5723baf82736cc71bb1a04a6102
Parents: 1976ce8
Author: Joel Koshy <jjko...@gmail.com>
Authored: Tue Apr 23 10:40:47 2013 -0700
Committer: Joel Koshy <jjko...@gmail.com>
Committed: Tue Apr 23 10:40:47 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/network/SocketServer.scala    |   20 ++++++++++++---
 core/src/main/scala/kafka/server/KafkaServer.scala |    2 +
 .../unit/kafka/network/SocketServerTest.scala      |    2 +
 3 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ede85875/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 648d936..5a44c28 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -37,6 +37,8 @@ class SocketServer(val brokerId: Int,
                    val port: Int,
                    val numProcessorThreads: Int,
                    val maxQueuedRequests: Int,
+                   val sendBufferSize: Int,
+                   val recvBufferSize: Int,
                    val maxRequestSize: Int = Int.MaxValue) extends Logging {
   this.logIdent = "[Socket Server on Broker " + brokerId + "], "
   private val time = SystemTime
@@ -56,7 +58,7 @@ class SocketServer(val brokerId: Int,
     requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
    
     // start accepting connections
-    this.acceptor = new Acceptor(host, port, processors)
+    this.acceptor = new Acceptor(host, port, processors, sendBufferSize, 
recvBufferSize)
     Utils.newThread("kafka-acceptor", acceptor, false).start()
     acceptor.awaitStartup
     info("started")
@@ -128,7 +130,8 @@ private[kafka] abstract class AbstractServerThread extends 
Runnable with Logging
 /**
  * Thread that accepts and configures new connections. There is only need for 
one of these
  */
-private[kafka] class Acceptor(val host: String, val port: Int, private val 
processors: Array[Processor]) extends AbstractServerThread {
+private[kafka] class Acceptor(val host: String, val port: Int, private val 
processors: Array[Processor],
+                              val sendBufferSize: Int, val recvBufferSize: 
Int) extends AbstractServerThread {
   val serverChannel = openServerSocket(host, port)
 
   /**
@@ -192,10 +195,19 @@ private[kafka] class Acceptor(val host: String, val port: 
Int, private val proce
    * Accept a new connection
    */
   def accept(key: SelectionKey, processor: Processor) {
-    val socketChannel = 
key.channel().asInstanceOf[ServerSocketChannel].accept()
-    debug("Accepted connection from " + socketChannel.socket.getInetAddress() 
+ " on " + socketChannel.socket.getLocalSocketAddress)
+    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
+    serverSocketChannel.socket().setReceiveBufferSize(recvBufferSize)
+
+    val socketChannel = serverSocketChannel.accept()
     socketChannel.configureBlocking(false)
     socketChannel.socket().setTcpNoDelay(true)
+    socketChannel.socket().setSendBufferSize(sendBufferSize)
+
+    debug("Accepted connection from %s on %s. sendBufferSize 
[actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
+          .format(socketChannel.socket.getInetAddress, 
socketChannel.socket.getLocalSocketAddress,
+                  socketChannel.socket.getSendBufferSize, sendBufferSize,
+                  socketChannel.socket.getReceiveBufferSize, recvBufferSize))
+
     processor.accept(socketChannel)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ede85875/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7298ccb..b4a57c6 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -66,6 +66,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
                                     config.port,
                                     config.numNetworkThreads,
                                     config.queuedMaxRequests,
+                                    config.socketSendBufferBytes,
+                                    config.socketReceiveBufferBytes,
                                     config.socketRequestMaxBytes)
 
     socketServer.startup

http://git-wip-us.apache.org/repos/asf/kafka/blob/ede85875/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index b347e66..94b5a2a 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -38,6 +38,8 @@ class SocketServerTest extends JUnitSuite {
                                               port = 
kafka.utils.TestUtils.choosePort,
                                               numProcessorThreads = 1,
                                               maxQueuedRequests = 50,
+                                              sendBufferSize = 300000,
+                                              recvBufferSize = 300000,
                                               maxRequestSize = 50)
   server.startup()
 

Reply via email to