Author: jbellis
Date: Sat Sep 18 10:01:40 2010
New Revision: 998432
URL: http://svn.apache.org/viewvc?rev=998432&view=rev
Log:
add options to configure Thrift socket keepalive and buffer sizes
patch by Erik Onnen and jbellis for CASSANDRA-1424
Added:
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=998432&r1=998431&r2=998432&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Sat Sep 18 10:01:40 2010
@@ -76,6 +76,7 @@
* fix races dealing with adding/dropping keyspaces and column families in
rapid succession (CASSANDRA-1477)
* clean up of Streaming system (CASSANDRA-1503, 1504, 1506)
+ * add options to configure Thrift socket keepalive and buffer sizes
(CASSANDRA-1426)
0.7-beta1
Modified: cassandra/trunk/conf/cassandra.yaml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=998432&r1=998431&r2=998432&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Sat Sep 18 10:01:40 2010
@@ -115,6 +115,13 @@ rpc_address: localhost
# port for Thrift to listen on
rpc_port: 9160
+# enable or disable keepalive on rpc connections
+rpc_keepalive: true
+
+# uncomment to set socket buffer sizes on rpc connections
+# rpc_send_buff_size_in_bytes:
+# rpc_recv_buff_size_in_bytes:
+
# Frame size for thrift (maximum field length).
# 0 disables TFramedTransport in favor of TSocket. This option
# is deprecated; we strongly recommend using Framed mode.
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=998432&r1=998431&r2=998432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Sat Sep 18
10:01:40 2010
@@ -63,6 +63,9 @@ public class Config
public String rpc_address;
public Integer rpc_port = 9160;
+ public Boolean rpc_keepalive = true;
+ public Integer rpc_send_buff_size_in_bytes;
+ public Integer rpc_recv_buff_size_in_bytes;
public Integer thrift_max_message_length_in_mb = 16;
public Integer thrift_framed_transport_size_in_mb = 15;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=998432&r1=998431&r2=998432&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Sat Sep 18 10:01:40 2010
@@ -257,7 +257,7 @@ public class DatabaseDescriptor
/* Local IP or hostname to bind RPC server to */
if (conf.rpc_address != null)
rpcAddress = InetAddress.getByName(conf.rpc_address);
-
+
if (conf.thrift_framed_transport_size_in_mb > 0 &&
conf.thrift_max_message_length_in_mb < conf.thrift_framed_transport_size_in_mb)
{
throw new
ConfigurationException("thrift_max_message_length_in_mb must be greater than
thrift_framed_transport_size_in_mb when using TFramedTransport");
@@ -1028,6 +1028,21 @@ public class DatabaseDescriptor
return rpcAddress;
}
+ public static boolean getRpcKeepAlive()
+ {
+ return conf.rpc_keepalive;
+ }
+
+ public static Integer getRpcSendBufferSize()
+ {
+ return conf.rpc_send_buff_size_in_bytes;
+ }
+
+ public static Integer getRpcRecvBufferSize()
+ {
+ return conf.rpc_recv_buff_size_in_bytes;
+ }
+
public static double getCommitLogSyncBatchWindow()
{
return conf.commitlog_sync_batch_window_in_ms;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=998432&r1=998431&r2=998432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
Sat Sep 18 10:01:40 2010
@@ -52,16 +52,20 @@ public class CassandraDaemon extends org
protected void setup() throws IOException
{
super.setup();
+
// now we start listening for clients
final CassandraServer cassandraServer = new CassandraServer();
Cassandra.Processor processor = new
Cassandra.Processor(cassandraServer);
// Transport
TServerSocket tServerSocket = null;
-
+
try
{
- tServerSocket = new TServerSocket(new
InetSocketAddress(listenAddr, listenPort));
+ tServerSocket = new TCustomServerSocket(new
InetSocketAddress(listenAddr, listenPort),
+
DatabaseDescriptor.getRpcKeepAlive(),
+
DatabaseDescriptor.getRpcSendBufferSize(),
+
DatabaseDescriptor.getRpcRecvBufferSize());
}
catch (TTransportException e)
{
Added:
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java?rev=998432&view=auto
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
(added)
+++
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
Sat Sep 18 10:01:40 2010
@@ -0,0 +1,84 @@
+package org.apache.cassandra.thrift;
+
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+/**
+ * Extends Thrift's TServerSocket to allow customization of various desirable
+ * TCP properties.
+ */
+public class TCustomServerSocket extends TServerSocket
+{
+
+ private static final Logger logger =
LoggerFactory.getLogger(TCustomServerSocket.class);
+
+ private final boolean keepAlive;
+ private final Integer sendBufferSize;
+ private final Integer recvBufferSize;
+
+ /**
+ * Allows fine-tuning of the server socket including keep-alive, reuse of
addresses, send and receive buffer sizes.
+ * @param bindAddr
+ * @param keepAlive
+ * @param sendBufferSize
+ * @param recvBufferSize
+ * @throws TTransportException
+ */
+ public TCustomServerSocket(InetSocketAddress bindAddr, boolean keepAlive,
Integer sendBufferSize, Integer recvBufferSize)
+ throws TTransportException
+ {
+ super(bindAddr);
+ this.keepAlive = keepAlive;
+ this.sendBufferSize = sendBufferSize;
+ this.recvBufferSize = recvBufferSize;
+ }
+
+ @Override
+ protected TSocket acceptImpl() throws TTransportException
+ {
+ TSocket tsocket = super.acceptImpl();
+ Socket socket = tsocket.getSocket();
+
+ try
+ {
+ socket.setKeepAlive(this.keepAlive);
+ }
+ catch (SocketException se)
+ {
+ logger.warn("Failed to set keep-alive on Thrift socket.", se);
+ }
+
+ if (this.sendBufferSize != null)
+ {
+ try
+ {
+ socket.setSendBufferSize(this.sendBufferSize.intValue());
+ }
+ catch (SocketException se)
+ {
+ logger.warn("Failed to set send buffer size on Thrift
socket.", se);
+ }
+ }
+
+ if (this.recvBufferSize != null)
+ {
+ try
+ {
+ socket.setReceiveBufferSize(this.recvBufferSize.intValue());
+ }
+ catch (SocketException se)
+ {
+ logger.warn("Failed to set receive buffer size on Thrift
socket.", se);
+ }
+ }
+
+ return tsocket;
+ }
+}