Author: jbellis
Date: Sat Sep 18 10:01:40 2010
New Revision: 998432

URL: http://svn.apache.org/viewvc?rev=998432&view=rev
Log:
add options to configure Thrift socket keepalive and buffer sizes
patch by Erik Onnen and jbellis for CASSANDRA-1424

Added:
    
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=998432&r1=998431&r2=998432&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Sat Sep 18 10:01:40 2010
@@ -76,6 +76,7 @@
  * fix races dealing with adding/dropping keyspaces and column families in
    rapid succession (CASSANDRA-1477)
  * clean up of Streaming system (CASSANDRA-1503, 1504, 1506)
+ * add options to configure Thrift socket keepalive and buffer sizes 
(CASSANDRA-1426)
 
 
 0.7-beta1

Modified: cassandra/trunk/conf/cassandra.yaml
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=998432&r1=998431&r2=998432&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Sat Sep 18 10:01:40 2010
@@ -115,6 +115,13 @@ rpc_address: localhost
 # port for Thrift to listen on
 rpc_port: 9160
 
+# enable or disable keepalive on rpc connections
+rpc_keepalive: true
+
+# uncomment to set socket buffer sizes on rpc connections
+# rpc_send_buff_size_in_bytes:
+# rpc_recv_buff_size_in_bytes:
+
 # Frame size for thrift (maximum field length).
 # 0 disables TFramedTransport in favor of TSocket. This option
 # is deprecated; we strongly recommend using Framed mode.

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=998432&r1=998431&r2=998432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Sat Sep 18 
10:01:40 2010
@@ -63,6 +63,9 @@ public class Config
     
     public String rpc_address;
     public Integer rpc_port = 9160;
+    public Boolean rpc_keepalive = true;
+    public Integer rpc_send_buff_size_in_bytes;
+    public Integer rpc_recv_buff_size_in_bytes;
 
     public Integer thrift_max_message_length_in_mb = 16;
     public Integer thrift_framed_transport_size_in_mb = 15;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=998432&r1=998431&r2=998432&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
Sat Sep 18 10:01:40 2010
@@ -257,7 +257,7 @@ public class DatabaseDescriptor
             /* Local IP or hostname to bind RPC server to */
             if (conf.rpc_address != null)
                 rpcAddress = InetAddress.getByName(conf.rpc_address);
-            
+
             if (conf.thrift_framed_transport_size_in_mb > 0 && 
conf.thrift_max_message_length_in_mb < conf.thrift_framed_transport_size_in_mb)
             {
                 throw new 
ConfigurationException("thrift_max_message_length_in_mb must be greater than 
thrift_framed_transport_size_in_mb when using TFramedTransport");
@@ -1028,6 +1028,21 @@ public class DatabaseDescriptor
         return rpcAddress;
     }
 
+    public static boolean getRpcKeepAlive()
+    {
+        return conf.rpc_keepalive;
+    }
+
+    public static Integer getRpcSendBufferSize()
+    {
+        return conf.rpc_send_buff_size_in_bytes;
+    }
+
+    public static Integer getRpcRecvBufferSize()
+    {
+        return conf.rpc_recv_buff_size_in_bytes;
+    }
+
     public static double getCommitLogSyncBatchWindow()
     {
         return conf.commitlog_sync_batch_window_in_ms;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=998432&r1=998431&r2=998432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java 
Sat Sep 18 10:01:40 2010
@@ -52,16 +52,20 @@ public class CassandraDaemon extends org
     protected void setup() throws IOException
     {
         super.setup();                
+
         // now we start listening for clients
         final CassandraServer cassandraServer = new CassandraServer();
         Cassandra.Processor processor = new 
Cassandra.Processor(cassandraServer);
 
         // Transport
         TServerSocket tServerSocket = null;
-        
+
         try
         {
-            tServerSocket = new TServerSocket(new 
InetSocketAddress(listenAddr, listenPort));
+            tServerSocket = new TCustomServerSocket(new 
InetSocketAddress(listenAddr, listenPort),
+                                                    
DatabaseDescriptor.getRpcKeepAlive(),
+                                                    
DatabaseDescriptor.getRpcSendBufferSize(),
+                                                    
DatabaseDescriptor.getRpcRecvBufferSize());
         }
         catch (TTransportException e)
         {

Added: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java?rev=998432&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java 
(added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java 
Sat Sep 18 10:01:40 2010
@@ -0,0 +1,84 @@
+package org.apache.cassandra.thrift;
+
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+/**
+ * Extends Thrift's TServerSocket to allow customization of various desirable
+ * TCP properties.
+ */
+public class TCustomServerSocket extends TServerSocket
+{
+
+    private static final Logger logger = 
LoggerFactory.getLogger(TCustomServerSocket.class);
+
+    private final boolean keepAlive;
+    private final Integer sendBufferSize;
+    private final Integer recvBufferSize;
+
+    /**
+     * Allows fine-tuning of the server socket including keep-alive, reuse of 
addresses, send and receive buffer sizes.
+     * @param bindAddr
+     * @param keepAlive
+     * @param sendBufferSize
+     * @param recvBufferSize
+     * @throws TTransportException
+     */
+    public TCustomServerSocket(InetSocketAddress bindAddr, boolean keepAlive, 
Integer sendBufferSize, Integer recvBufferSize)
+    throws TTransportException
+    {
+        super(bindAddr);
+        this.keepAlive = keepAlive;
+        this.sendBufferSize = sendBufferSize;
+        this.recvBufferSize = recvBufferSize;
+    }
+
+    @Override
+    protected TSocket acceptImpl() throws TTransportException
+    {
+        TSocket tsocket = super.acceptImpl();
+        Socket socket = tsocket.getSocket();
+
+        try
+        {
+            socket.setKeepAlive(this.keepAlive);
+        }
+        catch (SocketException se)
+        {
+            logger.warn("Failed to set keep-alive on Thrift socket.", se);
+        }
+
+        if (this.sendBufferSize != null)
+        {
+            try
+            {
+                socket.setSendBufferSize(this.sendBufferSize.intValue());
+            }
+            catch (SocketException se)
+            {
+                logger.warn("Failed to set send buffer size on Thrift 
socket.", se);
+            }
+        }
+
+        if (this.recvBufferSize != null)
+        {
+            try
+            {
+                socket.setReceiveBufferSize(this.recvBufferSize.intValue());
+            }
+            catch (SocketException se)
+            {
+                logger.warn("Failed to set receive buffer size on Thrift 
socket.", se);
+            }
+        }
+
+        return tsocket;
+    }
+}


Reply via email to