Updated Branches: refs/heads/trunk 90ce7ef21 -> 625e61396
add inter-node compression patch by Marcus Eriksson; reviewed by jbellis for CASSANDRA-3127 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/625e6139 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/625e6139 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/625e6139 Branch: refs/heads/trunk Commit: 625e61396b9e91d86ca989761c43da726f2e71b2 Parents: 90ce7ef Author: Jonathan Ellis <jbel...@apache.org> Authored: Tue Jun 5 21:27:44 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue Jun 5 21:29:21 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 7 +++ src/java/org/apache/cassandra/config/Config.java | 13 ++++- .../cassandra/config/DatabaseDescriptor.java | 5 ++ .../cassandra/net/IncomingTcpConnection.java | 17 ++++--- .../cassandra/net/OutboundTcpConnection.java | 40 +++++++++++++-- .../cassandra/streaming/StreamInSession.java | 4 +- 7 files changed, 72 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/625e6139/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8c16c35..44c0b8a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-dev + * add inter-node message compression (CASSANDRA-3127) * enforce 1m min keycache for auto (CASSANDRA-4306) * remove COPP (CASSANDRA-2479) * Track tombstone expiration and compact when tombstone content is http://git-wip-us.apache.org/repos/asf/cassandra/blob/625e6139/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index eb01f38..6969095 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -553,3 +553,10 @@ encryption_options: # algorithm: SunX509 # store_type: JKS # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA] + +# internode_compression controls whether traffic between nodes is +# compressed. +# can be: all - all traffic is compressed +# dc - traffic between different datacenters is compressed +# none - nothing is compressed. +internode_compression: all http://git-wip-us.apache.org/repos/asf/cassandra/blob/625e6139/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 0a18bf8..8a24400 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -112,6 +112,8 @@ public class Config public EncryptionOptions encryption_options = new EncryptionOptions(); + public InternodeCompression internode_compression = InternodeCompression.none; + public Integer index_interval = 128; public Double flush_largest_memtables_at = 1.0; @@ -158,12 +160,19 @@ public class Config loadYaml = value; } - public static enum CommitLogSync { + public static enum CommitLogSync + { periodic, batch } - public static enum DiskAccessMode { + public static enum InternodeCompression + { + all, none, dc + } + + public static enum DiskAccessMode + { auto, mmap, mmap_index_only, http://git-wip-us.apache.org/repos/asf/cassandra/blob/625e6139/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 7aaf0b2..7d71113 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1059,4 +1059,9 @@ public class DatabaseDescriptor { return conf.populate_io_cache_on_flush; } + + public static Config.InternodeCompression internodeCompression() + { + return conf.internode_compression; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/625e6139/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index 743b078..620fbdb 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -26,6 +26,7 @@ import java.net.Socket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.xerial.snappy.SnappyInputStream; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.util.FastByteArrayInputStream; @@ -53,17 +54,15 @@ public class IncomingTcpConnection extends Thread @Override public void run() { - DataInputStream input; - boolean isStream; - int version; try { // determine the connection type to decide whether to buffer - input = new DataInputStream(socket.getInputStream()); + DataInputStream input = new DataInputStream(socket.getInputStream()); MessagingService.validateMagic(input.readInt()); int header = input.readInt(); - isStream = MessagingService.getBits(header, 3, 1) == 1; - version = MessagingService.getBits(header, 15, 8); + boolean compressed = MessagingService.getBits(header, 2, 1) == 1; + boolean isStream = MessagingService.getBits(header, 3, 1) == 1; + int version = MessagingService.getBits(header, 15, 8); if (isStream) { if (version == MessagingService.current_version) @@ -97,7 +96,11 @@ public class IncomingTcpConnection extends Thread } Gossiper.instance.setVersion(from, version); logger.debug("set version for {} to {}", from, version); - + if (compressed) + { + logger.debug("Upgrading incoming connection to be compressed"); + input = new DataInputStream(new SnappyInputStream(input)); + } // loop to get the next message. while (true) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/625e6139/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index a5cf79d..76f496c 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -20,6 +20,7 @@ package org.apache.cassandra.net; import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.net.InetAddress; import java.net.Socket; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -27,10 +28,11 @@ import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.xerial.snappy.SnappyOutputStream; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.utils.FBUtilities; public class OutboundTcpConnection extends Thread { @@ -51,13 +53,23 @@ public class OutboundTcpConnection extends Thread private Socket socket; private volatile long completed; private final AtomicLong dropped = new AtomicLong(); + private boolean isUpgraded = false; + private boolean writeToLocalDC; public OutboundTcpConnection(OutboundTcpConnectionPool pool) { super("WRITE-" + pool.endPoint()); + this.writeToLocalDC = isSameDC(pool.endPoint()); this.poolReference = pool; } + private static boolean isSameDC(InetAddress targetHost) + { + String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost); + String thisDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(DatabaseDescriptor.getRpcAddress()); + return remoteDC.equals(thisDC); + } + public void enqueue(MessageOut<?> message, String id) { expireMessages(); @@ -137,6 +149,15 @@ public class OutboundTcpConnection extends Thread return dropped.get(); } + private boolean shouldUpgradeConnection() + { + if(Gossiper.instance.getVersion(poolReference.endPoint()) >= MessagingService.current_version && + (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.all || + (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc && !writeToLocalDC))) + return true; + return false; + } + private void writeConnected(MessageOut<?> message, String id) { try @@ -147,6 +168,13 @@ public class OutboundTcpConnection extends Thread { out.flush(); } + if(!isUpgraded && shouldUpgradeConnection()) + { + out.flush(); + logger.debug("Upgrading OutputStream to be compressed"); + out = new DataOutputStream(new SnappyOutputStream(new BufferedOutputStream(socket.getOutputStream()))); + isUpgraded = true; + } } catch (Exception e) { @@ -161,10 +189,10 @@ public class OutboundTcpConnection extends Thread public void write(MessageOut<?> message, String id, DataOutputStream out) throws IOException { - write(message, id, out, Gossiper.instance.getVersion(poolReference.endPoint())); + write(message, id, out, Gossiper.instance.getVersion(poolReference.endPoint()), shouldUpgradeConnection()); } - public static void write(MessageOut message, String id, DataOutputStream out, int version) throws IOException + public static void write(MessageOut message, String id, DataOutputStream out, int version, boolean compressionEnabled) throws IOException { /* Setting up the protocol header. This is 4 bytes long @@ -180,7 +208,7 @@ public class OutboundTcpConnection extends Thread // Setting up the serializer bit header |= MessagingService.serializerType.ordinal(); // set compression bit. - if (false) + if (compressionEnabled) header |= 4; // Setting up the version bit header |= (version << 8); @@ -216,6 +244,7 @@ public class OutboundTcpConnection extends Thread private boolean connect() { + isUpgraded = false; if (logger.isDebugEnabled()) logger.debug("attempting to connect to " + poolReference.endPoint()); long start = System.currentTimeMillis(); @@ -244,6 +273,9 @@ public class OutboundTcpConnection extends Thread } } } + // nodes can change dc during the lifetime of this OutboundTcpConnection, but we cannot upgrade a + // connection mid-flight, so it is safe to set here and dont change until the next connect() + this.writeToLocalDC = isSameDC(poolReference.endPoint()); return false; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/625e6139/src/java/org/apache/cassandra/streaming/StreamInSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java index 5d61a16..958924d 100644 --- a/src/java/org/apache/cassandra/streaming/StreamInSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java @@ -171,7 +171,7 @@ public class StreamInSession extends AbstractStreamSession OutboundTcpConnection.write(message, String.valueOf(getSessionId()), out, - Gossiper.instance.getVersion(getHost())); + Gossiper.instance.getVersion(getHost()), false); out.flush(); } @@ -222,7 +222,7 @@ public class StreamInSession extends AbstractStreamSession OutboundTcpConnection.write(reply.createMessage(), context.right.toString(), new DataOutputStream(socket.getOutputStream()), - Gossiper.instance.getVersion(getHost())); + Gossiper.instance.getVersion(getHost()), false); else logger.debug("No socket to reply to {} with!", getHost()); }