Repository: kafka Updated Branches: refs/heads/trunk 0cee0c321 -> 430bf56cd
KAFKA-724; Allow automatic socket.send.buffer from operating system in SocketServer If socket.receive.buffer.bytes/socket.send.buffer.bytes are set to -1, use the OS defaults. Author: Joshi <[email protected]> Author: Rekha Joshi <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #1469 from rekhajoshm/KAFKA-724-rebased Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/430bf56c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/430bf56c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/430bf56c Branch: refs/heads/trunk Commit: 430bf56cdfa32baffad21bb37dd50c080491dc03 Parents: 0cee0c3 Author: Joshi <[email protected]> Authored: Tue Jun 7 00:05:33 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Tue Jun 7 00:05:33 2016 +0100 ---------------------------------------------------------------------- .../java/org/apache/kafka/clients/CommonClientConfigs.java | 4 ++-- core/src/main/scala/kafka/network/SocketServer.scala | 9 ++++++--- core/src/main/scala/kafka/server/KafkaConfig.scala | 4 ++-- 3 files changed, 10 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/430bf56c/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 750b8a1..3327815 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -43,10 +43,10 @@ public class CommonClientConfigs { public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions."; public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes"; - public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF) to use when sending data."; + public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used."; public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes"; - public static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data."; + public static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used."; public static final String CLIENT_ID_CONFIG = "client.id"; public static final String CLIENT_ID_DOC = "An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging."; http://git-wip-us.apache.org/repos/asf/kafka/blob/430bf56c/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index b757abd..f00dd7b 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -32,7 +32,7 @@ import kafka.metrics.KafkaMetricsGroup import kafka.server.KafkaConfig import kafka.utils._ import org.apache.kafka.common.metrics._ -import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, LoginType, Mode, Selector => KSelector} +import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, LoginType, Mode, Selectable, Selector => KSelector} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.types.SchemaException @@ -304,7 +304,9 @@ private[kafka] class Acceptor(val endPoint: EndPoint, new InetSocketAddress(host, port) val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking(false) - serverChannel.socket().setReceiveBufferSize(recvBufferSize) + if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) + serverChannel.socket().setReceiveBufferSize(recvBufferSize) + try { serverChannel.socket.bind(socketAddress) info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostString, serverChannel.socket.getLocalPort)) @@ -326,7 +328,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint, socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) - socketChannel.socket().setSendBufferSize(sendBufferSize) + if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) + socketChannel.socket().setSendBufferSize(sendBufferSize) debug("Accepted connection from %s on %s. sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]" .format(socketChannel.socket.getInetAddress, socketChannel.socket.getLocalSocketAddress, http://git-wip-us.apache.org/repos/asf/kafka/blob/430bf56c/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ca66f9d..f9a12a9 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -396,8 +396,8 @@ object KafkaConfig { val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the listeners above." + " In IaaS environments, this may need to be different from the interface to which the broker binds." + " If this is not set, the value for `listeners` will be used." - val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets" - val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets" + val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used." + val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used." val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket request" val MaxConnectionsPerIpDoc = "The maximum number of connections we allow from each ip address" val MaxConnectionsPerIpOverridesDoc = "Per-ip or hostname overrides to the default maximum number of connections"
