Author: brandonwilliams Date: Wed Oct 26 20:56:31 2011 New Revision: 1189448
URL: http://svn.apache.org/viewvc?rev=1189448&view=rev Log: Allow encryption only between datacenters. Patch by Vijay, reviewed by brandonwilliams for CASSANDRA-2802 Modified: cassandra/branches/cassandra-1.0/CHANGES.txt cassandra/branches/cassandra-1.0/conf/cassandra.yaml cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/Config.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/EncryptionOptions.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/Ec2Snitch.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/FileStreamTask.java Modified: cassandra/branches/cassandra-1.0/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1189448&r1=1189447&r2=1189448&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/CHANGES.txt (original) +++ cassandra/branches/cassandra-1.0/CHANGES.txt Wed Oct 26 20:56:31 2011 @@ -56,6 +56,7 @@ Merged from 0.8: (CASSANDRA-3351) * remove incorrect optimization from slice read path (CASSANDRA-3390) * Fix race in AntiEntropyService (CASSANDRA-3400) + * allow encryption only between datacenters (CASSANDRA-2802) 1.0.0-final Modified: cassandra/branches/cassandra-1.0/conf/cassandra.yaml URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/conf/cassandra.yaml?rev=1189448&r1=1189447&r2=1189448&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/conf/cassandra.yaml (original) +++ cassandra/branches/cassandra-1.0/conf/cassandra.yaml Wed Oct 26 20:56:31 2011 @@ -164,6 +164,10 @@ sliced_buffer_size_in_kb: 64 # TCP port, for commands and data storage_port: 7000 +# SSL port, for encrypted communication. Unused unless enabled in +# encryption_options +ssl_storage_port: 7001 + # Address to bind to and tell other Cassandra nodes to connect to. You # _must_ change this if you want multiple nodes to be able to # communicate! @@ -403,7 +407,10 @@ index_interval: 128 # users generate their own keys) TLS_RSA_WITH_AES_128_CBC_SHA as the cipher # suite for authentication, key exchange and encryption of the actual data transfers. # NOTE: No custom encryption options are enabled at the moment -# The available internode options are : all, none +# The available internode options are : all, none, dc, rack +# +# If set to dc cassandra will encrypt the traffic between the DCs +# If set to rack cassandra will encrypt the traffic between the racks # # The passwords used in these options must match the passwords used when generating # the keystore and truststore. For instructions on generating these files, see: Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/Config.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/Config.java?rev=1189448&r1=1189447&r2=1189448&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/Config.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/Config.java Wed Oct 26 20:56:31 2011 @@ -62,6 +62,7 @@ public class Config public Integer sliced_buffer_size_in_kb = 64; public Integer storage_port = 7000; + public Integer ssl_storage_port = 7001; public String listen_address; public String broadcast_address; Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1189448&r1=1189447&r2=1189448&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Oct 26 20:56:31 2011 @@ -632,6 +632,11 @@ public class DatabaseDescriptor return Integer.parseInt(System.getProperty("cassandra.storage_port", conf.storage_port.toString())); } + public static int getSSLStoragePort() + { + return Integer.parseInt(System.getProperty("cassandra.ssl_storage_port", conf.ssl_storage_port.toString())); + } + public static int getRpcPort() { return Integer.parseInt(System.getProperty("cassandra.rpc_port", conf.rpc_port.toString())); Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/EncryptionOptions.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/EncryptionOptions.java?rev=1189448&r1=1189447&r2=1189448&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/EncryptionOptions.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/config/EncryptionOptions.java Wed Oct 26 20:56:31 2011 @@ -33,6 +33,8 @@ public class EncryptionOptions public static enum InternodeEncryption { all, - none + none, + dc, + rack } } Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/Ec2Snitch.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/Ec2Snitch.java?rev=1189448&r1=1189447&r2=1189448&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/Ec2Snitch.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/locator/Ec2Snitch.java Wed Oct 26 20:56:31 2011 @@ -44,6 +44,8 @@ public class Ec2Snitch extends AbstractN { protected static Logger logger = LoggerFactory.getLogger(Ec2Snitch.class); protected static final String ZONE_NAME_QUERY_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"; + private static final String DEFAULT_DC = "UNKNOWN-DC"; + private static final String DEFAULT_RACK = "UNKNOWN-RACK"; protected String ec2zone; protected String ec2region; @@ -83,14 +85,20 @@ public class Ec2Snitch extends AbstractN { if (endpoint.equals(FBUtilities.getBroadcastAddress())) return ec2zone; - return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RACK).value; + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (null == state || null == state.getApplicationState(ApplicationState.RACK)) + return DEFAULT_RACK; + return state.getApplicationState(ApplicationState.RACK).value; } public String getDatacenter(InetAddress endpoint) { if (endpoint.equals(FBUtilities.getBroadcastAddress())) return ec2region; - return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.DC).value; + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (null == state || null == state.getApplicationState(ApplicationState.DC)) + return DEFAULT_DC; + return state.getApplicationState(ApplicationState.DC).value; } @Override Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java?rev=1189448&r1=1189447&r2=1189448&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java Wed Oct 26 20:56:31 2011 @@ -35,6 +35,8 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.base.Function; +import com.google.common.collect.Lists; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,7 +90,7 @@ public final class MessagingService impl private static final Logger logger_ = LoggerFactory.getLogger(MessagingService.class); private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000; - private SocketThread socketThread; + private List<SocketThread> socketThreads = Lists.newArrayList(); private final SimpleCondition listenGate; /** @@ -236,41 +238,45 @@ public final class MessagingService impl */ public void listen(InetAddress localEp) throws IOException, ConfigurationException { - socketThread = new SocketThread(getServerSocket(localEp), "ACCEPT-" + localEp); - socketThread.start(); + for (ServerSocket ss: getServerSocket(localEp)) + { + SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp); + th.start(); + socketThreads.add(th); + } listenGate.signalAll(); } - private ServerSocket getServerSocket(InetAddress localEp) throws IOException, ConfigurationException + private List<ServerSocket> getServerSocket(InetAddress localEp) throws IOException, ConfigurationException { - final ServerSocket ss; - if (DatabaseDescriptor.getEncryptionOptions() != null && DatabaseDescriptor.getEncryptionOptions().internode_encryption == EncryptionOptions.InternodeEncryption.all) + final List<ServerSocket> ss = new ArrayList<ServerSocket>(); + if (DatabaseDescriptor.getEncryptionOptions() != null && DatabaseDescriptor.getEncryptionOptions().internode_encryption != EncryptionOptions.InternodeEncryption.none) { - ss = SSLFactory.getServerSocket(DatabaseDescriptor.getEncryptionOptions(), localEp, DatabaseDescriptor.getStoragePort()); + ss.add(SSLFactory.getServerSocket(DatabaseDescriptor.getEncryptionOptions(), localEp, DatabaseDescriptor.getSSLStoragePort())); // setReuseAddress happens in the factory. - logger_.info("Starting Encrypted Messaging Service on port {}", DatabaseDescriptor.getStoragePort()); + logger_.info("Starting Encrypted Messaging Service on SSL port {}", DatabaseDescriptor.getSSLStoragePort()); } - else + + ServerSocketChannel serverChannel = ServerSocketChannel.open(); + ServerSocket socket = serverChannel.socket(); + socket.setReuseAddress(true); + InetSocketAddress address = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()); + try { - ServerSocketChannel serverChannel = ServerSocketChannel.open(); - ss = serverChannel.socket(); - ss.setReuseAddress(true); - InetSocketAddress address = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()); - try - { - ss.bind(address); - } - catch (BindException e) - { - if (e.getMessage().contains("in use")) - throw new ConfigurationException(address + " is in use by another process. Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services"); - else if (e.getMessage().contains("Cannot assign requested address")) - throw new ConfigurationException("Unable to bind to address " + address + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2"); - else - throw e; - } - logger_.info("Starting Messaging Service on {}", address); + socket.bind(address); + } + catch (BindException e) + { + if (e.getMessage().contains("in use")) + throw new ConfigurationException(address + " is in use by another process. Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services"); + else if (e.getMessage().contains("Cannot assign requested address")) + throw new ConfigurationException("Unable to bind to address " + address + + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2"); + else + throw e; } + logger_.info("Starting Messaging Service on port {}", DatabaseDescriptor.getStoragePort()); + ss.add(socket); return ss; } @@ -453,7 +459,7 @@ public final class MessagingService impl public void stream(StreamHeader header, InetAddress to) { /* Streaming asynchronously on streamExector_ threads. */ - streamExecutor_.execute(new FileStreamTask(header, to, DatabaseDescriptor.getEncryptionOptions())); + streamExecutor_.execute(new FileStreamTask(header, to)); } /** The count of active outbound stream tasks. */ @@ -485,7 +491,8 @@ public final class MessagingService impl try { - socketThread.close(); + for (SocketThread th : socketThreads) + th.close(); } catch (IOException e) { Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1189448&r1=1189447&r2=1189448&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Oct 26 20:56:31 2011 @@ -24,7 +24,6 @@ package org.apache.cassandra.net; import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.net.InetAddress; import java.net.Socket; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -34,9 +33,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.EncryptionOptions; -import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -51,23 +47,17 @@ public class OutboundTcpConnection exten MessagingService.version_); private static final int OPEN_RETRY_DELAY = 100; // ms between retries - - private InetAddress endpoint; private final BlockingQueue<Pair<Message, String>> queue = new LinkedBlockingQueue<Pair<Message, String>>(); - private DataOutputStream out; + private final OutboundTcpConnectionPool poolReference; + private DataOutputStream out; private Socket socket; private long completedCount; - public OutboundTcpConnection(InetAddress remoteEp) - { - super("WRITE-" + remoteEp); - setEndPoint(remoteEp); - } - - public void setEndPoint(InetAddress remoteEndPoint) + public OutboundTcpConnection(OutboundTcpConnectionPool pool) { - this.endpoint = remoteEndPoint; + super("WRITE-" + pool.endPoint()); + this.poolReference = pool; } public void enqueue(Message message, String id) @@ -131,7 +121,7 @@ public class OutboundTcpConnection exten catch (IOException e) { if (logger.isDebugEnabled()) - logger.debug("error writing to " + endpoint, e); + logger.debug("error writing to " + poolReference.endPoint(), e); disconnect(); } } @@ -185,7 +175,7 @@ public class OutboundTcpConnection exten catch (IOException e) { if (logger.isDebugEnabled()) - logger.debug("exception closing connection to " + endpoint, e); + logger.debug("exception closing connection to " + poolReference.endPoint(), e); } out = null; socket = null; @@ -210,22 +200,13 @@ public class OutboundTcpConnection exten private boolean connect() { if (logger.isDebugEnabled()) - logger.debug("attempting to connect to " + endpoint); + logger.debug("attempting to connect to " + poolReference.endPoint()); long start = System.currentTimeMillis(); while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout()) { try { - // zero means 'bind on any available port.' - EncryptionOptions options = DatabaseDescriptor.getEncryptionOptions(); - if (options != null && options.internode_encryption == EncryptionOptions.InternodeEncryption.all) - { - socket = SSLFactory.getSocket(options, endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0); - } - else { - socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0); - } - + socket = poolReference.newSocket(); socket.setKeepAlive(true); socket.setTcpNoDelay(true); out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 4096)); @@ -235,7 +216,7 @@ public class OutboundTcpConnection exten { socket = null; if (logger.isTraceEnabled()) - logger.trace("unable to connect to " + endpoint, e); + logger.trace("unable to connect to " + poolReference.endPoint(), e); try { Thread.sleep(OPEN_RETRY_DELAY); Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=1189448&r1=1189447&r2=1189448&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Wed Oct 26 20:56:31 2011 @@ -18,20 +18,32 @@ package org.apache.cassandra.net; +import java.io.IOException; import java.net.InetAddress; +import java.net.Socket; import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.security.SSLFactory; +import org.apache.cassandra.utils.FBUtilities; public class OutboundTcpConnectionPool { + private IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + // pointer for the real Address. + private final InetAddress id; public final OutboundTcpConnection cmdCon; public final OutboundTcpConnection ackCon; + // pointer to the reseted Address. + private InetAddress resetedEndpoint; OutboundTcpConnectionPool(InetAddress remoteEp) { - cmdCon = new OutboundTcpConnection(remoteEp); + id = remoteEp; + cmdCon = new OutboundTcpConnection(this); cmdCon.start(); - ackCon = new OutboundTcpConnection(remoteEp); + ackCon = new OutboundTcpConnection(this); ackCon.start(); } @@ -55,9 +67,46 @@ public class OutboundTcpConnectionPool public void reset(InetAddress remoteEP) { - ackCon.setEndPoint(remoteEP); - ackCon.closeSocket(); - cmdCon.setEndPoint(remoteEP); - cmdCon.closeSocket(); + resetedEndpoint = remoteEP; + reset(); + } + + public Socket newSocket() throws IOException + { + // zero means 'bind on any available port.' + if (isEncryptedChannel()) + { + return SSLFactory.getSocket(DatabaseDescriptor.getEncryptionOptions(), endPoint(), DatabaseDescriptor.getSSLStoragePort(), FBUtilities.getLocalAddress(), 0); + } + else { + return new Socket(endPoint(), DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0); + } + } + + InetAddress endPoint() + { + return resetedEndpoint == null ? id : resetedEndpoint; + } + + boolean isEncryptedChannel() + { + switch (DatabaseDescriptor.getEncryptionOptions().internode_encryption) + { + case none: + return false; // if nothing needs to be encrypted then return immediately. + case all: + break; + case dc: + if (snitch.getDatacenter(id).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress()))) + return false; + break; + case rack: + // for rack then check if the DC's are the same. + if (snitch.getRack(id).equals(snitch.getRack(FBUtilities.getBroadcastAddress())) + && snitch.getDatacenter(id).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress()))) + return false; + break; + } + return true; } } Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/FileStreamTask.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1189448&r1=1189447&r2=1189448&view=diff ============================================================================== --- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original) +++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/FileStreamTask.java Wed Oct 26 20:56:31 2011 @@ -22,20 +22,16 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; -import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.compress.CompressedRandomAccessReader; import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.Throttle; import org.apache.cassandra.utils.WrappedRunnable; @@ -60,18 +56,15 @@ public class FileStreamTask extends Wrap private Socket socket; // socket's output stream private OutputStream output; - // system encryption options if any - private final EncryptionOptions encryptionOptions; // allocate buffer to use for transfers only once private final byte[] transferBuffer = new byte[CHUNK_SIZE]; // outbound global throughput limiter private final Throttle throttle; - public FileStreamTask(StreamHeader header, InetAddress to, EncryptionOptions encryptionOptions) + public FileStreamTask(StreamHeader header, InetAddress to) { this.header = header; this.to = to; - this.encryptionOptions = encryptionOptions; this.throttle = new Throttle(toString(), new Throttle.ThroughputFunction() { /** @return Instantaneous throughput target in bytes per millisecond. */ @@ -198,13 +191,13 @@ public class FileStreamTask extends Wrap */ private void connectAttempt() throws IOException { - bind(); int attempts = 0; while (true) { try { - connect(); + socket = MessagingService.instance().getConnectionPool(to).newSocket(); + output = socket.getOutputStream(); break; } catch (IOException e) @@ -226,22 +219,6 @@ public class FileStreamTask extends Wrap } } - protected void bind() throws IOException - { - socket = (encryptionOptions != null && encryptionOptions.internode_encryption == EncryptionOptions.InternodeEncryption.all) - ? SSLFactory.getSocket(encryptionOptions) - : new Socket(); - - // force local binding on correctly specified interface. - socket.bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0)); - } - - protected void connect() throws IOException - { - socket.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort())); - output = socket.getOutputStream(); - } - protected void close() throws IOException { output.close();