Updated Branches:
  refs/heads/trunk 90ce7ef21 -> 625e61396

add inter-node compression
patch by Marcus Eriksson; reviewed by jbellis for CASSANDRA-3127


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/625e6139
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/625e6139
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/625e6139

Branch: refs/heads/trunk
Commit: 625e61396b9e91d86ca989761c43da726f2e71b2
Parents: 90ce7ef
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Tue Jun 5 21:27:44 2012 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Tue Jun 5 21:29:21 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 conf/cassandra.yaml                                |    7 +++
 src/java/org/apache/cassandra/config/Config.java   |   13 ++++-
 .../cassandra/config/DatabaseDescriptor.java       |    5 ++
 .../cassandra/net/IncomingTcpConnection.java       |   17 ++++---
 .../cassandra/net/OutboundTcpConnection.java       |   40 +++++++++++++--
 .../cassandra/streaming/StreamInSession.java       |    4 +-
 7 files changed, 72 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/625e6139/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8c16c35..44c0b8a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-dev
+ * add inter-node message compression (CASSANDRA-3127)
  * enforce 1m min keycache for auto (CASSANDRA-4306)
  * remove COPP (CASSANDRA-2479)
  * Track tombstone expiration and compact when tombstone content is

http://git-wip-us.apache.org/repos/asf/cassandra/blob/625e6139/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index eb01f38..6969095 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -553,3 +553,10 @@ encryption_options:
     # algorithm: SunX509
     # store_type: JKS
     # cipher_suites: 
[TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA]
+
+# internode_compression controls whether traffic between nodes is
+# compressed.
+# can be:  all  - all traffic is compressed
+#          dc   - traffic between different datacenters is compressed
+#          none - nothing is compressed.
+internode_compression: all

http://git-wip-us.apache.org/repos/asf/cassandra/blob/625e6139/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 0a18bf8..8a24400 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -112,6 +112,8 @@ public class Config
 
     public EncryptionOptions encryption_options = new EncryptionOptions();
 
+    public InternodeCompression internode_compression = 
InternodeCompression.none;
+
     public Integer index_interval = 128;
 
     public Double flush_largest_memtables_at = 1.0;
@@ -158,12 +160,19 @@ public class Config
         loadYaml = value;
     }
 
-    public static enum CommitLogSync {
+    public static enum CommitLogSync
+    {
         periodic,
         batch
     }
 
-    public static enum DiskAccessMode {
+    public static enum InternodeCompression
+    {
+        all, none, dc
+    }
+
+    public static enum DiskAccessMode
+    {
         auto,
         mmap,
         mmap_index_only,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/625e6139/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 7aaf0b2..7d71113 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1059,4 +1059,9 @@ public class DatabaseDescriptor
     {
         return conf.populate_io_cache_on_flush;
     }
+
+    public static Config.InternodeCompression internodeCompression()
+    {
+        return conf.internode_compression;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/625e6139/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java 
b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 743b078..620fbdb 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -26,6 +26,7 @@ import java.net.Socket;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.xerial.snappy.SnappyInputStream;
 
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
@@ -53,17 +54,15 @@ public class IncomingTcpConnection extends Thread
     @Override
     public void run()
     {
-        DataInputStream input;
-        boolean isStream;
-        int version;
         try
         {
             // determine the connection type to decide whether to buffer
-            input = new DataInputStream(socket.getInputStream());
+            DataInputStream input = new 
DataInputStream(socket.getInputStream());
             MessagingService.validateMagic(input.readInt());
             int header = input.readInt();
-            isStream = MessagingService.getBits(header, 3, 1) == 1;
-            version = MessagingService.getBits(header, 15, 8);
+            boolean compressed = MessagingService.getBits(header, 2, 1) == 1;
+            boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
+            int version = MessagingService.getBits(header, 15, 8);
             if (isStream)
             {
                 if (version == MessagingService.current_version)
@@ -97,7 +96,11 @@ public class IncomingTcpConnection extends Thread
             }
             Gossiper.instance.setVersion(from, version);
             logger.debug("set version for {} to {}", from, version);
-
+            if (compressed)
+            {
+                logger.debug("Upgrading incoming connection to be compressed");
+                input = new DataInputStream(new SnappyInputStream(input));
+            }
             // loop to get the next message.
             while (true)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/625e6139/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index a5cf79d..76f496c 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.net;
 import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.Socket;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -27,10 +28,11 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.xerial.snappy.SnappyOutputStream;
 
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.utils.FBUtilities;
 
 public class OutboundTcpConnection extends Thread
 {
@@ -51,13 +53,23 @@ public class OutboundTcpConnection extends Thread
     private Socket socket;
     private volatile long completed;
     private final AtomicLong dropped = new AtomicLong();
+    private boolean isUpgraded = false;
+    private boolean writeToLocalDC;
 
     public OutboundTcpConnection(OutboundTcpConnectionPool pool)
     {
         super("WRITE-" + pool.endPoint());
+        this.writeToLocalDC = isSameDC(pool.endPoint());
         this.poolReference = pool;
     }
 
+    private static boolean isSameDC(InetAddress targetHost)
+    {
+        String remoteDC = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost);
+        String thisDC = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(DatabaseDescriptor.getRpcAddress());
+        return remoteDC.equals(thisDC);
+    }
+
     public void enqueue(MessageOut<?> message, String id)
     {
         expireMessages();
@@ -137,6 +149,15 @@ public class OutboundTcpConnection extends Thread
         return dropped.get();
     }
 
+    private boolean shouldUpgradeConnection()
+    {
+        if(Gossiper.instance.getVersion(poolReference.endPoint()) >= 
MessagingService.current_version &&
+                (DatabaseDescriptor.internodeCompression() == 
Config.InternodeCompression.all ||
+                (DatabaseDescriptor.internodeCompression() == 
Config.InternodeCompression.dc && !writeToLocalDC)))
+            return true;
+        return false;
+    }
+
     private void writeConnected(MessageOut<?> message, String id)
     {
         try
@@ -147,6 +168,13 @@ public class OutboundTcpConnection extends Thread
             {
                 out.flush();
             }
+            if(!isUpgraded && shouldUpgradeConnection())
+            {
+                out.flush();
+                logger.debug("Upgrading OutputStream to be compressed");
+                out = new DataOutputStream(new SnappyOutputStream(new 
BufferedOutputStream(socket.getOutputStream())));
+                isUpgraded = true;
+            }
         }
         catch (Exception e)
         {
@@ -161,10 +189,10 @@ public class OutboundTcpConnection extends Thread
 
     public void write(MessageOut<?> message, String id, DataOutputStream out) 
throws IOException
     {
-        write(message, id, out, 
Gossiper.instance.getVersion(poolReference.endPoint()));
+        write(message, id, out, 
Gossiper.instance.getVersion(poolReference.endPoint()), 
shouldUpgradeConnection());
     }
 
-    public static void write(MessageOut message, String id, DataOutputStream 
out, int version) throws IOException
+    public static void write(MessageOut message, String id, DataOutputStream 
out, int version, boolean compressionEnabled) throws IOException
     {
         /*
          Setting up the protocol header. This is 4 bytes long
@@ -180,7 +208,7 @@ public class OutboundTcpConnection extends Thread
         // Setting up the serializer bit
         header |= MessagingService.serializerType.ordinal();
         // set compression bit.
-        if (false)
+        if (compressionEnabled)
             header |= 4;
         // Setting up the version bit
         header |= (version << 8);
@@ -216,6 +244,7 @@ public class OutboundTcpConnection extends Thread
 
     private boolean connect()
     {
+        isUpgraded = false;
         if (logger.isDebugEnabled())
             logger.debug("attempting to connect to " + 
poolReference.endPoint());
         long start = System.currentTimeMillis();
@@ -244,6 +273,9 @@ public class OutboundTcpConnection extends Thread
                 }
             }
         }
+        // nodes can change dc during the lifetime of this 
OutboundTcpConnection, but we cannot upgrade a
+        // connection mid-flight, so it is safe to set here and dont change 
until the next connect()
+        this.writeToLocalDC = isSameDC(poolReference.endPoint());
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/625e6139/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java 
b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index 5d61a16..958924d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -171,7 +171,7 @@ public class StreamInSession extends AbstractStreamSession
         OutboundTcpConnection.write(message,
                                     String.valueOf(getSessionId()),
                                     out,
-                                    Gossiper.instance.getVersion(getHost()));
+                                    Gossiper.instance.getVersion(getHost()), 
false);
         out.flush();
     }
 
@@ -222,7 +222,7 @@ public class StreamInSession extends AbstractStreamSession
                     OutboundTcpConnection.write(reply.createMessage(),
                                                 context.right.toString(),
                                                 new 
DataOutputStream(socket.getOutputStream()),
-                                                
Gossiper.instance.getVersion(getHost()));
+                                                
Gossiper.instance.getVersion(getHost()), false);
                 else
                     logger.debug("No socket to reply to {} with!", getHost());
             }

Reply via email to