Repository: cassandra Updated Branches: refs/heads/trunk 87962dcf3 -> 260846685
Node to Node encryption transitional mode patch by jasobrown; reviewed by Stefan Podkowinski for CASSANDRA-10404 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/26084668 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/26084668 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/26084668 Branch: refs/heads/trunk Commit: 260846685b6129a324a7cb7396da135fee85ec04 Parents: 87962dc Author: Jason Brown <[email protected]> Authored: Wed Feb 15 05:41:30 2017 -0800 Committer: Jason Brown <[email protected]> Committed: Fri Nov 3 05:06:38 2017 -0700 ---------------------------------------------------------------------- NEWS.txt | 5 +- conf/cassandra.yaml | 23 +- .../org/apache/cassandra/config/Config.java | 4 +- .../cassandra/config/DatabaseDescriptor.java | 33 ++- .../cassandra/config/EncryptionOptions.java | 43 +++- .../locator/ReconnectableSnitchHelper.java | 2 +- .../apache/cassandra/net/MessagingService.java | 124 +++++++--- .../cassandra/net/async/NettyFactory.java | 117 ++++++---- .../cassandra/net/async/OptionalSslHandler.java | 67 ++++++ .../net/async/OutboundConnectionIdentifier.java | 6 + .../net/async/OutboundMessagingConnection.java | 27 +++ .../cassandra/streaming/StreamSession.java | 2 +- .../org/apache/cassandra/tools/BulkLoader.java | 2 +- .../apache/cassandra/tools/LoaderOptions.java | 6 +- .../org/apache/cassandra/transport/Client.java | 7 +- .../org/apache/cassandra/transport/Server.java | 2 +- .../cassandra/transport/SimpleClient.java | 15 +- .../org/apache/cassandra/utils/FBUtilities.java | 2 +- .../cassandra/net/MessagingServiceTest.java | 228 ++++++++++++++++--- .../cassandra/net/async/NettyFactoryTest.java | 51 ++++- .../async/OutboundMessagingConnectionTest.java | 45 ++++ .../service/ProtocolBetaVersionTest.java | 4 +- .../cassandra/transport/MessagePayloadTest.java | 4 +- .../stress/settings/SettingsTransport.java | 5 +- .../stress/settings/StressSettings.java | 2 +- .../cassandra/stress/util/JavaDriverClient.java | 6 +- 26 files changed, 657 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 7a133b8..09a9a7b 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -39,7 +39,7 @@ Upgrading 4.0 and the legacy tables must have been removed. See the 'Upgrading' section for version 2.2 for migration instructions. - Cassandra 4.0 removed support for the deprecated Thrift interface. Amongst - Tother things, this imply the removal of all yaml option related to thrift + other things, this implies the removal of all yaml options related to thrift ('start_rpc', rpc_port, ...). - Cassandra 4.0 removed support for any pre-3.0 format. This means you cannot upgrade from a 2.x version to 4.0 directly, you have to upgrade to @@ -67,6 +67,9 @@ Upgrading - the miniumum value for internode message timeouts is 10ms. Previously, any positive value was allowed. See cassandra.yaml entries like read_request_timeout_in_ms for more details. + - Cassandra 4.0 allows a single port to be used for both secure and insecure + connections between cassandra nodes (CASSANDRA-10404). See the yaml for + specific property changes, and see the security doc for full details. Materialized Views ------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index ef94613..e41af17 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -570,9 +570,10 @@ trickle_fsync_interval_in_kb: 10240 # For security reasons, you should not expose this port to the internet. Firewall it if needed. storage_port: 7000 -# SSL port, for encrypted communication. Unused unless enabled in -# encryption_options -# For security reasons, you should not expose this port to the internet. Firewall it if needed. +# SSL port, for legacy encrypted communication. This property is unused unless enabled in +# server_encryption_options (see below). As of cassandra 4.0, this property is deprecated +# as a single port can be used for either/both secure and insecure connections. +# For security reasons, you should not expose this port to the internet. Firewall it if needed. ssl_storage_port: 7001 # Address or interface to bind to and tell other Cassandra nodes to connect to. @@ -920,7 +921,7 @@ dynamic_snitch_reset_interval_in_ms: 600000 dynamic_snitch_badness_threshold: 0.1 # Enable or disable inter-node encryption -# JVM defaults for supported SSL socket protocols and cipher suites can +# JVM and netty defaults for supported SSL socket protocols and cipher suites can # be replaced using custom encryption options. This is not recommended # unless you have policies in place that dictate certain settings, or # need to disable vulnerable ciphers or protocols in case the JVM cannot @@ -928,17 +929,25 @@ dynamic_snitch_badness_threshold: 0.1 # FIPS compliant settings can be configured at JVM level and should not # involve changing encryption settings here: # https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/FIPS.html +# # *NOTE* No custom encryption options are enabled at the moment # The available internode options are : all, none, dc, rack -# # If set to dc cassandra will encrypt the traffic between the DCs # If set to rack cassandra will encrypt the traffic between the racks # # The passwords used in these options must match the passwords used when generating # the keystore and truststore. For instructions on generating these files, see: -# http://download.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore +# http://download.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore # server_encryption_options: + # set to true for allowing secure incoming connections + enabled: false + # If enabled and optional are both set to true, encrypted and unencrypted connections are handled on the storage_port + optional: false + # if enabled, will open up an encrypted listening socket on ssl_storage_port. Should be used + # during upgrade to 4.0; otherwise, set to false. + enable_legacy_ssl_storage_port: false + # on outbound connections, determine which type of peers to securely connect to. 'enabled' must be set to true. internode_encryption: none keystore: conf/.keystore keystore_password: cassandra @@ -952,7 +961,7 @@ server_encryption_options: # require_client_auth: false # require_endpoint_verification: false -# enable or disable client/server encryption. +# enable or disable client-to-server encryption. client_encryption_options: enabled: false # If enabled and optional is set to true encrypted and unencrypted connections are handled. http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index a28d492..de193b0 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -209,9 +209,7 @@ public class Config public double dynamic_snitch_badness_threshold = 0.1; public EncryptionOptions.ServerEncryptionOptions server_encryption_options = new EncryptionOptions.ServerEncryptionOptions(); - public EncryptionOptions.ClientEncryptionOptions client_encryption_options = new EncryptionOptions.ClientEncryptionOptions(); - // this encOptions is for backward compatibility (a warning is logged by DatabaseDescriptor) - public EncryptionOptions.ServerEncryptionOptions encryption_options; + public EncryptionOptions client_encryption_options = new EncryptionOptions(); public InternodeCompression internode_compression = InternodeCompression.none; http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index d948abf..af1cbde 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -43,6 +43,7 @@ import org.apache.cassandra.auth.IAuthorizer; import org.apache.cassandra.auth.IInternodeAuthenticator; import org.apache.cassandra.auth.IRoleManager; import org.apache.cassandra.config.Config.CommitLogSync; +import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.exceptions.ConfigurationException; @@ -648,13 +649,6 @@ public class DatabaseDescriptor throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '" + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false); - if(conf.encryption_options != null) - { - logger.warn("Please rename encryption_options as server_encryption_options in the yaml"); - //operate under the assumption that server_encryption_options is not set in yaml rather than both - conf.server_encryption_options = conf.encryption_options; - } - if (conf.user_defined_function_fail_timeout < 0) throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false); if (conf.user_defined_function_warn_timeout < 0) @@ -683,6 +677,14 @@ public class DatabaseDescriptor throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false); } + // internode messaging encryption options + if (conf.server_encryption_options.internode_encryption != InternodeEncryption.none + && !conf.server_encryption_options.enabled) + { + throw new ConfigurationException("Encryption must be enabled in server_encryption_options when using peer-to-peer security. " + + "server_encryption_options.internode_encryption = " + conf.server_encryption_options.internode_encryption, false); + } + if (conf.max_value_size_in_mb <= 0) throw new ConfigurationException("max_value_size_in_mb must be positive", false); else if (conf.max_value_size_in_mb >= 2048) @@ -1638,6 +1640,11 @@ public class DatabaseDescriptor return listenAddress; } + public static void setListenAddress(InetAddress newlistenAddress) + { + listenAddress = newlistenAddress; + } + public static InetAddress getBroadcastAddress() { return broadcastAddress; @@ -1648,6 +1655,11 @@ public class DatabaseDescriptor return conf.listen_on_broadcast_address; } + public static void setShouldListenOnBroadcastAddress(boolean shouldListenOnBroadcastAddress) + { + conf.listen_on_broadcast_address = shouldListenOnBroadcastAddress; + } + public static void setListenOnBroadcastAddress(boolean listen_on_broadcast_address) { conf.listen_on_broadcast_address = listen_on_broadcast_address; @@ -1939,7 +1951,12 @@ public class DatabaseDescriptor return conf.server_encryption_options; } - public static EncryptionOptions.ClientEncryptionOptions getClientEncryptionOptions() + public static void setServerEncryptionOptions(EncryptionOptions.ServerEncryptionOptions encryptionOptions) + { + conf.server_encryption_options = encryptionOptions; + } + + public static EncryptionOptions getClientEncryptionOptions() { return conf.client_encryption_options; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/config/EncryptionOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/EncryptionOptions.java b/src/java/org/apache/cassandra/config/EncryptionOptions.java index 6010746..aecbfca 100644 --- a/src/java/org/apache/cassandra/config/EncryptionOptions.java +++ b/src/java/org/apache/cassandra/config/EncryptionOptions.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.config; -public abstract class EncryptionOptions +public class EncryptionOptions { public String keystore = "conf/.keystore"; public String keystore_password = "cassandra"; @@ -29,19 +29,52 @@ public abstract class EncryptionOptions public String store_type = "JKS"; public boolean require_client_auth = false; public boolean require_endpoint_verification = false; + public boolean enabled = false; + public boolean optional = false; - public static class ClientEncryptionOptions extends EncryptionOptions + public EncryptionOptions() + { } + + /** + * Copy constructor + */ + public EncryptionOptions(EncryptionOptions options) { - public boolean enabled = false; - public boolean optional = false; + keystore = options.keystore; + keystore_password = options.keystore_password; + truststore = options.truststore; + truststore_password = options.truststore_password; + cipher_suites = options.cipher_suites; + protocol = options.protocol; + algorithm = options.algorithm; + store_type = options.store_type; + require_client_auth = options.require_client_auth; + require_endpoint_verification = options.require_endpoint_verification; + enabled = options.enabled; + optional = options.optional; } public static class ServerEncryptionOptions extends EncryptionOptions { - public static enum InternodeEncryption + public enum InternodeEncryption { all, none, dc, rack } + public InternodeEncryption internode_encryption = InternodeEncryption.none; + public boolean enable_legacy_ssl_storage_port = false; + + public ServerEncryptionOptions() + { } + + /** + * Copy constructor + */ + public ServerEncryptionOptions(ServerEncryptionOptions options) + { + super(options); + internode_encryption = options.internode_encryption; + enable_legacy_ssl_storage_port = options.enable_legacy_ssl_storage_port; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java index 2235c76..0b344c9 100644 --- a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java +++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java @@ -64,7 +64,7 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber @VisibleForTesting static void reconnect(InetAddress publicAddress, InetAddress localAddress, IEndpointSnitch snitch, String localDc) { - if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(publicAddress, MessagingService.portFor(publicAddress))) + if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(publicAddress, MessagingService.instance().portFor(publicAddress))) { logger.debug("InternodeAuthenticator said don't reconnect to {} on {}", publicAddress, localAddress); return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 2a44e68..4e6fe1c 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -1,4 +1,4 @@ -/* + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -729,10 +729,15 @@ public final class MessagingService implements MessagingServiceMBean public void listen() { + listen(DatabaseDescriptor.getServerEncryptionOptions()); + } + + public void listen(ServerEncryptionOptions serverEncryptionOptions) + { callbacks.reset(); // hack to allow tests to stop/restart MS - listen(FBUtilities.getLocalAddress()); + listen(FBUtilities.getLocalAddress(), serverEncryptionOptions); if (shouldListenOnBroadcastAddress()) - listen(FBUtilities.getBroadcastAddress()); + listen(FBUtilities.getBroadcastAddress(), serverEncryptionOptions); listenGate.signalAll(); } @@ -747,40 +752,54 @@ public final class MessagingService implements MessagingServiceMBean * * @param localEp InetAddress whose port to listen on. */ - private void listen(InetAddress localEp) throws ConfigurationException + private void listen(InetAddress localEp, ServerEncryptionOptions serverEncryptionOptions) throws ConfigurationException { IInternodeAuthenticator authenticator = DatabaseDescriptor.getInternodeAuthenticator(); int receiveBufferSize = DatabaseDescriptor.getInternodeRecvBufferSize(); - if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.none) + // this is the legacy socket, for letting peer nodes that haven't upgrade yet connect to this node. + // should only occur during cluster upgrade. we can remove this block at 5.0! + if (serverEncryptionOptions.enabled && serverEncryptionOptions.enable_legacy_ssl_storage_port) { + // clone the encryption options, and explicitly set the optional field to false + // (do not allow non-TLS connections on the legacy ssl port) + ServerEncryptionOptions legacyEncOptions = new ServerEncryptionOptions(serverEncryptionOptions); + legacyEncOptions.optional = false; + InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getSSLStoragePort()); - ChannelGroup channelGroup = new DefaultChannelGroup("EncryptedInternodeMessagingGroup", NettyFactory.executorForChannelGroups()); - InboundInitializer initializer = new InboundInitializer(authenticator, DatabaseDescriptor.getServerEncryptionOptions(), channelGroup); + ChannelGroup channelGroup = new DefaultChannelGroup("LegacyEncryptedInternodeMessagingGroup", NettyFactory.executorForChannelGroups()); + InboundInitializer initializer = new InboundInitializer(authenticator, legacyEncOptions, channelGroup); Channel encryptedChannel = NettyFactory.instance.createInboundChannel(localAddr, initializer, receiveBufferSize); - serverChannels.add(new ServerChannel(encryptedChannel, channelGroup)); + serverChannels.add(new ServerChannel(encryptedChannel, channelGroup, localAddr, ServerChannel.SecurityLevel.REQUIRED)); } - if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.all) - { - InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()); - ChannelGroup channelGroup = new DefaultChannelGroup("InternodeMessagingGroup", NettyFactory.executorForChannelGroups()); - InboundInitializer initializer = new InboundInitializer(authenticator, null, channelGroup); - Channel channel = NettyFactory.instance.createInboundChannel(localAddr, initializer, receiveBufferSize); - serverChannels.add(new ServerChannel(channel, channelGroup)); - } - - if (serverChannels.isEmpty()) - throw new IllegalStateException("no listening channels set up in MessagingService!"); + // this is for the socket that can be plain, only ssl, or optional plain/ssl + InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()); + ChannelGroup channelGroup = new DefaultChannelGroup("InternodeMessagingGroup", NettyFactory.executorForChannelGroups()); + InboundInitializer initializer = new InboundInitializer(authenticator, serverEncryptionOptions, channelGroup); + Channel channel = NettyFactory.instance.createInboundChannel(localAddr, initializer, receiveBufferSize); + ServerChannel.SecurityLevel securityLevel = !serverEncryptionOptions.enabled ? ServerChannel.SecurityLevel.NONE : + serverEncryptionOptions.optional ? ServerChannel.SecurityLevel.OPTIONAL : + ServerChannel.SecurityLevel.REQUIRED; + serverChannels.add(new ServerChannel(channel, channelGroup, localAddr, securityLevel)); } /** * A simple struct to wrap up the the components needed for each listening socket. + * <p> + * The {@link #securityLevel} is captured independently of the {@link #channel} as there's no real way to inspect a s + * erver-side 'channel' to check if it using TLS or not (the channel's configured pipeline will only apply to + * connections that get created, so it's not inspectible). {@link #securityLevel} is really only used for testing, anyway. */ @VisibleForTesting static class ServerChannel { /** + * Declares the type of TLS used with the channel. + */ + enum SecurityLevel { NONE, OPTIONAL, REQUIRED } + + /** * The base {@link Channel} that is doing the spcket listen/accept. */ private final Channel channel; @@ -790,23 +809,46 @@ public final class MessagingService implements MessagingServiceMBean * the inbound connections/channels can be closed when the listening socket itself is being closed. */ private final ChannelGroup connectedChannels; + private final InetSocketAddress address; + private final SecurityLevel securityLevel; - private ServerChannel(Channel channel, ChannelGroup channelGroup) + private ServerChannel(Channel channel, ChannelGroup channelGroup, InetSocketAddress address, SecurityLevel securityLevel) { this.channel = channel; this.connectedChannels = channelGroup; + this.address = address; + this.securityLevel = securityLevel; } void close() { - channel.close().syncUninterruptibly(); - connectedChannels.close().syncUninterruptibly(); + if (channel.isOpen()) + channel.close().awaitUninterruptibly(); + connectedChannels.close().awaitUninterruptibly(); } - int size() + int size() { return connectedChannels.size(); } + + /** + * For testing only! + */ + Channel getChannel() + { + return channel; + } + + InetSocketAddress getAddress() + { + return address; + } + + SecurityLevel getSecurityLevel() + { + return securityLevel; + } } public void waitUntilListening() @@ -1038,6 +1080,11 @@ public final class MessagingService implements MessagingServiceMBean */ public void shutdown() { + shutdown(false); + } + + public void shutdown(boolean isTest) + { logger.info("Waiting for messaging service to quiesce"); // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first assert !StageManager.getStage(Stage.MUTATION).isShutdown(); @@ -1057,7 +1104,8 @@ public final class MessagingService implements MessagingServiceMBean for (OutboundMessagingPool pool : channelManagers.values()) pool.close(false); - NettyFactory.instance.close(); + if (!isTest) + NettyFactory.instance.close(); } catch (Exception e) { @@ -1065,6 +1113,14 @@ public final class MessagingService implements MessagingServiceMBean } } + /** + * For testing only! + */ + void clearServerChannels() + { + serverChannels.clear(); + } + public void receive(MessageIn message, int id) { TraceState state = Tracing.instance.initializeFromMessage(message); @@ -1443,7 +1499,7 @@ public final class MessagingService implements MessagingServiceMBean if (pool == null) { final boolean secure = isEncryptedConnection(to); - final int port = portFor(secure); + final int port = portFor(to, secure); if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(to, port)) return null; @@ -1463,15 +1519,25 @@ public final class MessagingService implements MessagingServiceMBean return pool; } - public static int portFor(InetAddress addr) + public int portFor(InetAddress addr) { final boolean secure = isEncryptedConnection(addr); - return portFor(secure); + return portFor(addr, secure); } - private static int portFor(boolean secure) + private int portFor(InetAddress address, boolean secure) { - return secure ? DatabaseDescriptor.getSSLStoragePort() : DatabaseDescriptor.getStoragePort(); + if (!secure) + return DatabaseDescriptor.getStoragePort(); + + Integer v = versions.get(address); + // if we don't know the version of the peer, assume it is 4.0 (or higher) as the only time is would be lower + // (as in a 3.x version) is during a cluster upgrade (from 3.x to 4.0). In that case the outbound connection will + // unfortunately fail - however the peer should connect to this node (at some point), and once we learn it's version, it'll be + // in versions map. thus, when we attempt to reconnect to that node, we'll have the version and we can get the correct port. + // we will be able to remove this logic at 5.0. + int version = v != null ? v.intValue() : VERSION_40; + return version < VERSION_40 ? DatabaseDescriptor.getSSLStoragePort() : DatabaseDescriptor.getStoragePort(); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/net/async/NettyFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java index d193e31..7fb81d3 100644 --- a/src/java/org/apache/cassandra/net/async/NettyFactory.java +++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java @@ -1,8 +1,10 @@ package org.apache.cassandra.net.async; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.zip.Checksum; +import javax.annotation.Nullable; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLParameters; @@ -45,7 +47,6 @@ import net.jpountz.xxhash.XXHashFactory; import org.apache.cassandra.auth.IInternodeAuthenticator; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; -import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.security.SSLFactory; @@ -53,7 +54,6 @@ import org.apache.cassandra.service.NativeTransportService; import org.apache.cassandra.utils.ChecksumType; import org.apache.cassandra.utils.CoalescingStrategies; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.NativeLibrary; /** * A factory for building Netty {@link Channel}s. Channels here are setup with a pipeline to participate @@ -70,17 +70,13 @@ public final class NettyFactory private static final int LZ4_HASH_SEED = 0x9747b28c; - /** - * Default seed value for xxhash. - */ - public static final int XXHASH_DEFAULT_SEED = 0x9747b28c; - public enum Mode { MESSAGING, STREAMING } - private static final String SSL_CHANNEL_HANDLER_NAME = "ssl"; - public static final String INBOUND_COMPRESSOR_HANDLER_NAME = "inboundCompressor"; - public static final String OUTBOUND_COMPRESSOR_HANDLER_NAME = "outboundCompressor"; - public static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler"; + static final String SSL_CHANNEL_HANDLER_NAME = "ssl"; + private static final String OPTIONAL_SSL_CHANNEL_HANDLER_NAME = "optionalSsl"; + static final String INBOUND_COMPRESSOR_HANDLER_NAME = "inboundCompressor"; + static final String OUTBOUND_COMPRESSOR_HANDLER_NAME = "outboundCompressor"; + private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler"; public static final String INBOUND_STREAM_HANDLER_NAME = "inboundStreamHandler"; /** a useful addition for debugging; simply set to true to get more data in your logs */ @@ -125,7 +121,7 @@ public final class NettyFactory NettyFactory(boolean useEpoll) { this.useEpoll = useEpoll; - acceptGroup = getEventLoopGroup(useEpoll, determineAcceptGroupSize(DatabaseDescriptor.getServerEncryptionOptions().internode_encryption), + acceptGroup = getEventLoopGroup(useEpoll, determineAcceptGroupSize(DatabaseDescriptor.getServerEncryptionOptions()), "MessagingService-NettyAcceptor-Thread", false); inboundGroup = getEventLoopGroup(useEpoll, FBUtilities.getAvailableProcessors(), "MessagingService-NettyInbound-Thread", false); outboundGroup = getEventLoopGroup(useEpoll, FBUtilities.getAvailableProcessors(), "MessagingService-NettyOutbound-Thread", true); @@ -134,19 +130,23 @@ public final class NettyFactory /** * Determine the number of accept threads we need, which is based upon the number of listening sockets we will have. - * We'll have either 1 or 2 listen sockets, depending on if we use SSL or not in combination with non-SSL. If we have both, - * we'll have two sockets, and thus need two threads; else one socket and one thread. - * - * If the operator has configured multiple IP addresses (both {@link org.apache.cassandra.config.Config#broadcast_address} - * and {@link org.apache.cassandra.config.Config#listen_address} are configured), then we listen on another set of sockets - * - basically doubling the count. See CASSANDRA-9748 for more details. + * The idea is one accept thread per listening socket. */ - static int determineAcceptGroupSize(InternodeEncryption internode_encryption) + public static int determineAcceptGroupSize(ServerEncryptionOptions serverEncryptionOptions) { - int listenSocketCount = internode_encryption == InternodeEncryption.dc || internode_encryption == InternodeEncryption.rack ? 2 : 1; + int listenSocketCount = 1; + + boolean listenOnBroadcastAddr = MessagingService.shouldListenOnBroadcastAddress(); + if (listenOnBroadcastAddr) + listenSocketCount++; - if (MessagingService.shouldListenOnBroadcastAddress()) - listenSocketCount *= 2; + if (serverEncryptionOptions.enable_legacy_ssl_storage_port) + { + listenSocketCount++; + + if (listenOnBroadcastAddr) + listenSocketCount++; + } return listenSocketCount; } @@ -236,6 +236,28 @@ public final class NettyFactory return channelFuture.channel(); } + /** + * Creates a new {@link SslHandler} from provided SslContext. + * @param peer enables endpoint verification for remote address when not null + */ + static SslHandler newSslHandler(Channel channel, SslContext sslContext, @Nullable InetSocketAddress peer) + { + if (peer == null) + { + return sslContext.newHandler(channel.alloc()); + } + else + { + logger.debug("Creating SSL handler for %s:%d", peer.getHostString(), peer.getPort()); + SslHandler sslHandler = sslContext.newHandler(channel.alloc(), peer.getHostString(), peer.getPort()); + SSLEngine engine = sslHandler.engine(); + SSLParameters sslParameters = engine.getSSLParameters(); + sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); + engine.setSSLParameters(sslParameters); + return sslHandler; + } + } + public static class InboundInitializer extends ChannelInitializer<SocketChannel> { private final IInternodeAuthenticator authenticator; @@ -256,12 +278,20 @@ public final class NettyFactory ChannelPipeline pipeline = channel.pipeline(); // order of handlers: ssl -> logger -> handshakeHandler - if (encryptionOptions != null) + if (encryptionOptions.enabled) { - SslContext sslContext = SSLFactory.getSslContext(encryptionOptions, true, true); - SslHandler sslHandler = sslContext.newHandler(channel.alloc()); - logger.trace("creating inbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName()); - pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler); + if (encryptionOptions.optional) + { + pipeline.addFirst(OPTIONAL_SSL_CHANNEL_HANDLER_NAME, new OptionalSslHandler(encryptionOptions)); + } + else + { + SslContext sslContext = SSLFactory.getSslContext(encryptionOptions, true, true); + InetSocketAddress peer = encryptionOptions.require_endpoint_verification ? channel.remoteAddress() : null; + SslHandler sslHandler = newSslHandler(channel, sslContext, peer); + logger.trace("creating inbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName()); + pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler); + } } if (WIRETRACE) @@ -271,7 +301,7 @@ public final class NettyFactory } } - private String encryptionLogStatement(ServerEncryptionOptions options) + private static String encryptionLogStatement(ServerEncryptionOptions options) { if (options == null) return "disabled"; @@ -287,9 +317,11 @@ public final class NettyFactory @VisibleForTesting public Bootstrap createOutboundBootstrap(OutboundConnectionParams params) { - logger.debug("creating outbound bootstrap to peer {}, compression: {}, encryption: {}, coalesce: {}", params.connectionId.connectionAddress(), + logger.debug("creating outbound bootstrap to peer {}, compression: {}, encryption: {}, coalesce: {}, protocolVersion: {}", + params.connectionId.connectionAddress(), params.compress, encryptionLogStatement(params.encryptionOptions), - params.coalescingStrategy.isPresent() ? params.coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED); + params.coalescingStrategy.isPresent() ? params.coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED, + params.protocolVersion); Class<? extends Channel> transport = useEpoll ? EpollSocketChannel.class : NioSocketChannel.class; Bootstrap bootstrap = new Bootstrap().group(params.mode == Mode.MESSAGING ? outboundGroup : streamingGroup) .channel(transport) @@ -315,6 +347,12 @@ public final class NettyFactory this.params = params; } + /** + * {@inheritDoc} + * + * To determine if we should enable TLS, we only need to check if {@link #params#encryptionOptions} is set. + * The logic for figuring that out is is located in {@link MessagingService#getMessagingConnection(InetAddress)}; + */ public void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); @@ -323,22 +361,9 @@ public final class NettyFactory if (params.encryptionOptions != null) { SslContext sslContext = SSLFactory.getSslContext(params.encryptionOptions, true, false); - - final SslHandler sslHandler; - if (params.encryptionOptions.require_endpoint_verification) - { - InetSocketAddress peer = params.connectionId.remoteAddress(); - sslHandler = sslContext.newHandler(channel.alloc(), peer.getHostString(), peer.getPort()); - SSLEngine engine = sslHandler.engine(); - SSLParameters sslParameters = engine.getSSLParameters(); - sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); - engine.setSSLParameters(sslParameters); - } - else - { - sslHandler = sslContext.newHandler(channel.alloc()); - } - + // for some reason channel.remoteAddress() will return null + InetSocketAddress peer = params.encryptionOptions.require_endpoint_verification ? params.connectionId.remoteAddress() : null; + SslHandler sslHandler = newSslHandler(channel, sslContext, peer); logger.trace("creating outbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName()); pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/net/async/OptionalSslHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OptionalSslHandler.java b/src/java/org/apache/cassandra/net/async/OptionalSslHandler.java new file mode 100644 index 0000000..b60ae13 --- /dev/null +++ b/src/java/org/apache/cassandra/net/async/OptionalSslHandler.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import java.net.InetSocketAddress; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslHandler; +import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; +import org.apache.cassandra.security.SSLFactory; + +public class OptionalSslHandler extends ByteToMessageDecoder +{ + private final ServerEncryptionOptions encryptionOptions; + + OptionalSslHandler(ServerEncryptionOptions encryptionOptions) + { + this.encryptionOptions = encryptionOptions; + } + + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception + { + if (in.readableBytes() < 5) + { + // To detect if SSL must be used we need to have at least 5 bytes, so return here and try again + // once more bytes a ready. + return; + } + + if (SslHandler.isEncrypted(in)) + { + // Connection uses SSL/TLS, replace the detection handler with a SslHandler and so use encryption. + SslContext sslContext = SSLFactory.getSslContext(encryptionOptions, true, true); + Channel channel = ctx.channel(); + InetSocketAddress peer = encryptionOptions.require_endpoint_verification ? (InetSocketAddress) channel.remoteAddress() : null; + SslHandler sslHandler = NettyFactory.newSslHandler(channel, sslContext, peer); + ctx.pipeline().replace(this, NettyFactory.SSL_CHANNEL_HANDLER_NAME, sslHandler); + } + else + { + // Connection use no TLS/SSL encryption, just remove the detection handler and continue without + // SslHandler in the pipeline. + ctx.pipeline().remove(this); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java index c834bd4..6b2ff0d 100644 --- a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java +++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java @@ -120,6 +120,12 @@ public class OutboundConnectionIdentifier return new OutboundConnectionIdentifier(localAddr, remoteAddr, remoteConnectionAddr, connectionType); } + public OutboundConnectionIdentifier withNewConnectionPort(int port) + { + return new OutboundConnectionIdentifier(localAddr, new InetSocketAddress(remoteAddr.getAddress(), port), + new InetSocketAddress(remoteConnectionAddr.getAddress(), port), connectionType); + } + /** * The local node address. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java index 6bda9cd..4522ba4 100644 --- a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java +++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java @@ -271,6 +271,7 @@ public class OutboundMessagingConnection } boolean compress = shouldCompressConnection(connectionId.local(), connectionId.remote()); + maybeUpdateConnectionId(); Bootstrap bootstrap = buildBootstrap(compress); ChannelFuture connectFuture = bootstrap.connect(); @@ -289,12 +290,38 @@ public class OutboundMessagingConnection || ((DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc) && !isLocalDC(localHost, remoteHost)); } + /** + * After a bounce we won't necessarily know the peer's version, so we assume the peer is at least 4.0 + * and thus using a single port for secure and non-secure communication. However, during a rolling upgrade from + * 3.0.x/3.x to 4.0, the not-yet upgraded peer is still listening on separate ports, but we don't know the peer's + * version until we can successfully connect. Fortunately, the peer can connect to this node, at which point + * we'll grab it's version. We then use that knowledge to use the {@link Config#ssl_storage_port} to connect on, + * and to do that we need to update some member fields in this instance. + * + * Note: can be removed at 5.0 + */ + void maybeUpdateConnectionId() + { + if (encryptionOptions != null) + { + int version = MessagingService.instance().getVersion(connectionId.remote()); + if (version < targetVersion) + { + targetVersion = version; + int port = MessagingService.instance().portFor(connectionId.remote()); + connectionId = connectionId.withNewConnectionPort(port); + logger.debug("changing connectionId to {}, with a different port for secure communication, because peer version is {}", connectionId, version); + } + } + } + private Bootstrap buildBootstrap(boolean compress) { boolean tcpNoDelay = isLocalDC(connectionId.local(), connectionId.remote()) ? INTRADC_TCP_NODELAY : DatabaseDescriptor.getInterDCTcpNoDelay(); int sendBufferSize = DatabaseDescriptor.getInternodeSendBufferSize() > 0 ? DatabaseDescriptor.getInternodeSendBufferSize() : OutboundConnectionParams.DEFAULT_SEND_BUFFER_SIZE; + OutboundConnectionParams params = OutboundConnectionParams.builder() .connectionId(connectionId) .callback(this::finishHandshake) http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 0381416..b6351f9 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -198,7 +198,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber this.index = index; OutboundConnectionIdentifier id = OutboundConnectionIdentifier.stream(new InetSocketAddress(FBUtilities.getBroadcastAddress(), 0), - new InetSocketAddress(connecting, MessagingService.portFor(connecting))); + new InetSocketAddress(connecting, MessagingService.instance().portFor(connecting))); this.messageSender = new NettyStreamingMessageSender(this, id, factory, StreamMessage.CURRENT_VERSION, previewKind.isPreview()); this.metrics = StreamingMetrics.get(connecting); this.keepSSTableLevel = keepSSTableLevel; http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index e7b812f..0812e53 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -242,7 +242,7 @@ public class BulkLoader } } - private static SSLOptions buildSSLOptions(EncryptionOptions.ClientEncryptionOptions clientEncryptionOptions) + private static SSLOptions buildSSLOptions(EncryptionOptions clientEncryptionOptions) { if (!clientEncryptionOptions.enabled) http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/tools/LoaderOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java b/src/java/org/apache/cassandra/tools/LoaderOptions.java index 38317b6..c821e6a 100644 --- a/src/java/org/apache/cassandra/tools/LoaderOptions.java +++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java @@ -77,7 +77,7 @@ public class LoaderOptions public final int interDcThrottle; public final int storagePort; public final int sslStoragePort; - public final EncryptionOptions.ClientEncryptionOptions clientEncOptions; + public final EncryptionOptions clientEncOptions; public final int connectionsPerHost; public final EncryptionOptions.ServerEncryptionOptions serverEncOptions; public final Set<InetAddress> hosts; @@ -119,7 +119,7 @@ public class LoaderOptions int interDcThrottle = 0; int storagePort; int sslStoragePort; - EncryptionOptions.ClientEncryptionOptions clientEncOptions = new EncryptionOptions.ClientEncryptionOptions(); + EncryptionOptions clientEncOptions = new EncryptionOptions(); int connectionsPerHost = 1; EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions(); Set<InetAddress> hosts = new HashSet<>(); @@ -208,7 +208,7 @@ public class LoaderOptions return this; } - public Builder encOptions(EncryptionOptions.ClientEncryptionOptions encOptions) + public Builder encOptions(EncryptionOptions encOptions) { this.clientEncOptions = encOptions; return this; http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/transport/Client.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java index 4793d17..3632175 100644 --- a/src/java/org/apache/cassandra/transport/Client.java +++ b/src/java/org/apache/cassandra/transport/Client.java @@ -28,6 +28,7 @@ import com.google.common.base.Splitter; import org.apache.cassandra.auth.PasswordAuthenticator; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.marshal.Int32Type; @@ -37,13 +38,11 @@ import org.apache.cassandra.utils.Hex; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MD5Digest; -import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions; - public class Client extends SimpleClient { private final SimpleEventHandler eventHandler = new SimpleEventHandler(); - public Client(String host, int port, ProtocolVersion version, ClientEncryptionOptions encryptionOptions) + public Client(String host, int port, ProtocolVersion version, EncryptionOptions encryptionOptions) { super(host, port, version, encryptionOptions); setEventHandler(eventHandler); @@ -248,7 +247,7 @@ public class Client extends SimpleClient int port = Integer.parseInt(args[1]); ProtocolVersion version = args.length == 3 ? ProtocolVersion.decode(Integer.parseInt(args[2])) : ProtocolVersion.CURRENT; - ClientEncryptionOptions encryptionOptions = new ClientEncryptionOptions(); + EncryptionOptions encryptionOptions = new EncryptionOptions(); System.out.println("CQL binary protocol console " + host + "@" + port + " using native protocol version " + version); new Client(host, port, version, encryptionOptions).run(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 9408a3a..d3f1c2c 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -137,7 +137,7 @@ public class Server implements CassandraDaemon.Server if (this.useSSL) { - final EncryptionOptions.ClientEncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions(); + final EncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions(); if (clientEnc.optional) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/transport/SimpleClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index ddd3484..9c1fb07 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -41,6 +41,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.ssl.SslContext; import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.Slf4JLoggerFactory; +import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.security.SSLFactory; @@ -56,7 +57,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; -import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions; +import io.netty.handler.ssl.SslHandler; public class SimpleClient implements Closeable { @@ -68,7 +69,7 @@ public class SimpleClient implements Closeable private static final Logger logger = LoggerFactory.getLogger(SimpleClient.class); public final String host; public final int port; - private final ClientEncryptionOptions encryptionOptions; + private final EncryptionOptions encryptionOptions; protected final ResponseHandler responseHandler = new ResponseHandler(); protected final Connection.Tracker tracker = new ConnectionTracker(); @@ -87,22 +88,22 @@ public class SimpleClient implements Closeable } }; - public SimpleClient(String host, int port, ProtocolVersion version, ClientEncryptionOptions encryptionOptions) + public SimpleClient(String host, int port, ProtocolVersion version, EncryptionOptions encryptionOptions) { this(host, port, version, false, encryptionOptions); } - public SimpleClient(String host, int port, ClientEncryptionOptions encryptionOptions) + public SimpleClient(String host, int port, EncryptionOptions encryptionOptions) { this(host, port, ProtocolVersion.CURRENT, encryptionOptions); } public SimpleClient(String host, int port, ProtocolVersion version) { - this(host, port, version, new ClientEncryptionOptions()); + this(host, port, version, new EncryptionOptions()); } - public SimpleClient(String host, int port, ProtocolVersion version, boolean useBeta, ClientEncryptionOptions encryptionOptions) + public SimpleClient(String host, int port, ProtocolVersion version, boolean useBeta, EncryptionOptions encryptionOptions) { this.host = host; this.port = port; @@ -115,7 +116,7 @@ public class SimpleClient implements Closeable public SimpleClient(String host, int port) { - this(host, port, new ClientEncryptionOptions()); + this(host, port, new EncryptionOptions()); } public void connect(boolean useCompression) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index 3faa034..1cb59d4 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -835,7 +835,7 @@ public class FBUtilities } @VisibleForTesting - protected static void reset() + public static void reset() { localInetAddress = null; broadcastInetAddress = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/test/unit/org/apache/cassandra/net/MessagingServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java index a082d56..f0a959e 100644 --- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -36,6 +36,7 @@ import java.util.regex.*; import java.util.regex.Matcher; import com.google.common.collect.Iterables; +import com.google.common.net.InetAddresses; import com.codahale.metrics.Timer; @@ -43,6 +44,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import org.apache.cassandra.auth.IInternodeAuthenticator; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.monitoring.ApproximateTime; import org.apache.cassandra.exceptions.ConfigurationException; @@ -77,7 +79,9 @@ public class MessagingServiceTest } }; - static final IInternodeAuthenticator originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator(); + private static IInternodeAuthenticator originalAuthenticator; + private static ServerEncryptionOptions originalServerEncryptionOptions; + private static InetAddress originalListenAddress; private final MessagingService messagingService = MessagingService.test(); @@ -87,6 +91,9 @@ public class MessagingServiceTest DatabaseDescriptor.daemonInitialization(); DatabaseDescriptor.setBackPressureStrategy(new MockBackPressureStrategy(Collections.emptyMap())); DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1")); + originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator(); + originalServerEncryptionOptions = DatabaseDescriptor.getServerEncryptionOptions(); + originalListenAddress = DatabaseDescriptor.getListenAddress(); } private static int metricScopeId = 0; @@ -101,9 +108,13 @@ public class MessagingServiceTest } @After - public void replaceAuthenticator() + public void tearDown() { DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator); + DatabaseDescriptor.setServerEncryptionOptions(originalServerEncryptionOptions); + DatabaseDescriptor.setShouldListenOnBroadcastAddress(false); + DatabaseDescriptor.setListenAddress(originalListenAddress); + FBUtilities.reset(); } @Test @@ -465,39 +476,188 @@ public class MessagingServiceTest @Test public void testCloseInboundConnections() throws UnknownHostException, InterruptedException { - messagingService.listen(); - Assert.assertTrue(messagingService.isListening()); - Assert.assertTrue(messagingService.serverChannels.size() > 0); - for (ServerChannel serverChannel : messagingService.serverChannels) - Assert.assertEquals(0, serverChannel.size()); - - // now, create a connection and make sure it's in a channel group - InetSocketAddress server = new InetSocketAddress(FBUtilities.getBroadcastAddress(), DatabaseDescriptor.getStoragePort()); - OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 0), server); - - CountDownLatch latch = new CountDownLatch(1); - OutboundConnectionParams params = OutboundConnectionParams.builder() - .mode(NettyFactory.Mode.MESSAGING) - .sendBufferSize(1 << 10) - .connectionId(id) - .callback(handshakeResult -> latch.countDown()) - .protocolVersion(MessagingService.current_version) - .build(); - Bootstrap bootstrap = NettyFactory.instance.createOutboundBootstrap(params); - Channel channel = bootstrap.connect().awaitUninterruptibly().channel(); - Assert.assertNotNull(channel); - latch.await(1, TimeUnit.SECONDS); // allow the netty pipeline/c* handshake to get set up - - int connectCount = 0; - for (ServerChannel serverChannel : messagingService.serverChannels) - connectCount += serverChannel.size(); - Assert.assertTrue(connectCount > 0); - - // last, shutdown the MS and make sure connections are removed - messagingService.shutdown(); - for (ServerChannel serverChannel : messagingService.serverChannels) - Assert.assertEquals(0, serverChannel.size()); + try + { + messagingService.listen(); + Assert.assertTrue(messagingService.isListening()); + Assert.assertTrue(messagingService.serverChannels.size() > 0); + for (ServerChannel serverChannel : messagingService.serverChannels) + Assert.assertEquals(0, serverChannel.size()); + + // now, create a connection and make sure it's in a channel group + InetSocketAddress server = new InetSocketAddress(FBUtilities.getBroadcastAddress(), DatabaseDescriptor.getStoragePort()); + OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 0), server); + + CountDownLatch latch = new CountDownLatch(1); + OutboundConnectionParams params = OutboundConnectionParams.builder() + .mode(NettyFactory.Mode.MESSAGING) + .sendBufferSize(1 << 10) + .connectionId(id) + .callback(handshakeResult -> latch.countDown()) + .protocolVersion(MessagingService.current_version) + .build(); + Bootstrap bootstrap = NettyFactory.instance.createOutboundBootstrap(params); + Channel channel = bootstrap.connect().awaitUninterruptibly().channel(); + Assert.assertNotNull(channel); + latch.await(1, TimeUnit.SECONDS); // allow the netty pipeline/c* handshake to get set up + + int connectCount = 0; + for (ServerChannel serverChannel : messagingService.serverChannels) + connectCount += serverChannel.size(); + Assert.assertTrue(connectCount > 0); + } + finally + { + // last, shutdown the MS and make sure connections are removed + messagingService.shutdown(true); + for (ServerChannel serverChannel : messagingService.serverChannels) + Assert.assertEquals(0, serverChannel.size()); + messagingService.clearServerChannels(); + } + } + + @Test + public void listenPlainConnection() + { + ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions(); + serverEncryptionOptions.enabled = false; + listen(serverEncryptionOptions, false); + } + + @Test + public void listenPlainConnectionWithBroadcastAddr() + { + ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions(); + serverEncryptionOptions.enabled = false; + listen(serverEncryptionOptions, true); + } + + @Test + public void listenRequiredSecureConnection() + { + ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions(); + serverEncryptionOptions.enabled = true; + serverEncryptionOptions.optional = false; + serverEncryptionOptions.enable_legacy_ssl_storage_port = false; + listen(serverEncryptionOptions, false); + } + + @Test + public void listenRequiredSecureConnectionWithBroadcastAddr() + { + ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions(); + serverEncryptionOptions.enabled = true; + serverEncryptionOptions.optional = false; + serverEncryptionOptions.enable_legacy_ssl_storage_port = false; + listen(serverEncryptionOptions, true); + } + + @Test + public void listenRequiredSecureConnectionWithLegacyPort() + { + ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions(); + serverEncryptionOptions.enabled = true; + serverEncryptionOptions.optional = false; + serverEncryptionOptions.enable_legacy_ssl_storage_port = true; + listen(serverEncryptionOptions, false); + } + + @Test + public void listenRequiredSecureConnectionWithBroadcastAddrAndLegacyPort() + { + ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions(); + serverEncryptionOptions.enabled = true; + serverEncryptionOptions.optional = false; + serverEncryptionOptions.enable_legacy_ssl_storage_port = true; + listen(serverEncryptionOptions, true); } + @Test + public void listenOptionalSecureConnection() + { + ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions(); + serverEncryptionOptions.enabled = true; + serverEncryptionOptions.optional = true; + listen(serverEncryptionOptions, false); + } + + @Test + public void listenOptionalSecureConnectionWithBroadcastAddr() + { + ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions(); + serverEncryptionOptions.enabled = true; + serverEncryptionOptions.optional = true; + listen(serverEncryptionOptions, true); + } + + private void listen(ServerEncryptionOptions serverEncryptionOptions, boolean listenOnBroadcastAddr) + { + InetAddress listenAddress = null; + if (listenOnBroadcastAddr) + { + DatabaseDescriptor.setShouldListenOnBroadcastAddress(true); + listenAddress = InetAddresses.increment(FBUtilities.getBroadcastAddress()); + DatabaseDescriptor.setListenAddress(listenAddress); + FBUtilities.reset(); + } + + try + { + messagingService.listen(serverEncryptionOptions); + Assert.assertTrue(messagingService.isListening()); + int expectedListeningCount = NettyFactory.determineAcceptGroupSize(serverEncryptionOptions); + Assert.assertEquals(expectedListeningCount, messagingService.serverChannels.size()); + + if (!serverEncryptionOptions.enabled) + { + // make sure no channel is using TLS + for (ServerChannel serverChannel : messagingService.serverChannels) + Assert.assertEquals(ServerChannel.SecurityLevel.NONE, serverChannel.getSecurityLevel()); + } + else + { + final int legacySslPort = DatabaseDescriptor.getSSLStoragePort(); + boolean foundLegacyListenSslAddress = false; + for (ServerChannel serverChannel : messagingService.serverChannels) + { + if (serverEncryptionOptions.optional) + Assert.assertEquals(ServerChannel.SecurityLevel.OPTIONAL, serverChannel.getSecurityLevel()); + else + Assert.assertEquals(ServerChannel.SecurityLevel.REQUIRED, serverChannel.getSecurityLevel()); + + if (serverEncryptionOptions.enable_legacy_ssl_storage_port) + { + if (legacySslPort == serverChannel.getAddress().getPort()) + { + foundLegacyListenSslAddress = true; + Assert.assertEquals(ServerChannel.SecurityLevel.REQUIRED, serverChannel.getSecurityLevel()); + } + } + } + + if (serverEncryptionOptions.enable_legacy_ssl_storage_port && !foundLegacyListenSslAddress) + Assert.fail("failed to find legacy ssl listen address"); + } + // check the optional listen address + if (listenOnBroadcastAddr) + { + int expectedCount = (serverEncryptionOptions.enabled && serverEncryptionOptions.enable_legacy_ssl_storage_port) ? 2 : 1; + int found = 0; + for (ServerChannel serverChannel : messagingService.serverChannels) + { + if (serverChannel.getAddress().getAddress().equals(listenAddress)) + found++; + } + + Assert.assertEquals(expectedCount, found); + } + } + finally + { + messagingService.shutdown(true); + messagingService.clearServerChannels(); + Assert.assertEquals(0, messagingService.serverChannels.size()); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java index 67b221a..0550490 100644 --- a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java +++ b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java @@ -31,7 +31,6 @@ import org.junit.Test; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; -import io.netty.channel.DefaultEventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; @@ -44,10 +43,9 @@ import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.cassandra.auth.AllowAllInternodeAuthenticator; import org.apache.cassandra.auth.IInternodeAuthenticator; -import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; -import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.async.NettyFactory.InboundInitializer; @@ -154,10 +152,15 @@ public class NettyFactoryTest @Test public void deterineAcceptGroupSize() { - Assert.assertEquals(1, NettyFactory.determineAcceptGroupSize(InternodeEncryption.none)); - Assert.assertEquals(1, NettyFactory.determineAcceptGroupSize(InternodeEncryption.all)); - Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(InternodeEncryption.rack)); - Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(InternodeEncryption.dc)); + ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions(); + serverEncryptionOptions.enabled = false; + Assert.assertEquals(1, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions)); + serverEncryptionOptions.enabled = true; + Assert.assertEquals(1, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions)); + + serverEncryptionOptions.enable_legacy_ssl_storage_port = true; + Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions)); + serverEncryptionOptions.enable_legacy_ssl_storage_port = false; InetAddress originalBroadcastAddr = FBUtilities.getBroadcastAddress(); try @@ -165,10 +168,13 @@ public class NettyFactoryTest FBUtilities.setBroadcastInetAddress(InetAddresses.increment(FBUtilities.getLocalAddress())); DatabaseDescriptor.setListenOnBroadcastAddress(true); - Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(InternodeEncryption.none)); - Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(InternodeEncryption.all)); - Assert.assertEquals(4, NettyFactory.determineAcceptGroupSize(InternodeEncryption.rack)); - Assert.assertEquals(4, NettyFactory.determineAcceptGroupSize(InternodeEncryption.dc)); + serverEncryptionOptions.enabled = false; + Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions)); + serverEncryptionOptions.enabled = true; + Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions)); + + serverEncryptionOptions.enable_legacy_ssl_storage_port = true; + Assert.assertEquals(4, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions)); } finally { @@ -263,10 +269,13 @@ public class NettyFactoryTest @Test public void createInboundInitializer_WithoutSsl() throws Exception { - InboundInitializer initializer = new InboundInitializer(AUTHENTICATOR, null, channelGroup); + ServerEncryptionOptions encryptionOptions = new ServerEncryptionOptions(); + encryptionOptions.enabled = false; + InboundInitializer initializer = new InboundInitializer(AUTHENTICATOR, encryptionOptions, channelGroup); NioSocketChannel channel = new NioSocketChannel(); initializer.initChannel(channel); Assert.assertNull(channel.pipeline().get(SslHandler.class)); + Assert.assertNull(channel.pipeline().get(OptionalSslHandler.class)); } private ServerEncryptionOptions encOptions() @@ -281,15 +290,33 @@ public class NettyFactoryTest encryptionOptions.cipher_suites = new String[] {"TLS_RSA_WITH_AES_128_CBC_SHA"}; return encryptionOptions; } + @Test public void createInboundInitializer_WithSsl() throws Exception { ServerEncryptionOptions encryptionOptions = encOptions(); + encryptionOptions.enabled = true; + encryptionOptions.optional = false; InboundInitializer initializer = new InboundInitializer(AUTHENTICATOR, encryptionOptions, channelGroup); NioSocketChannel channel = new NioSocketChannel(); Assert.assertNull(channel.pipeline().get(SslHandler.class)); initializer.initChannel(channel); Assert.assertNotNull(channel.pipeline().get(SslHandler.class)); + Assert.assertNull(channel.pipeline().get(OptionalSslHandler.class)); + } + + @Test + public void createInboundInitializer_WithOptionalSsl() throws Exception + { + ServerEncryptionOptions encryptionOptions = encOptions(); + encryptionOptions.enabled = true; + encryptionOptions.optional = true; + InboundInitializer initializer = new InboundInitializer(AUTHENTICATOR, encryptionOptions, channelGroup); + NioSocketChannel channel = new NioSocketChannel(); + Assert.assertNull(channel.pipeline().get(SslHandler.class)); + initializer.initChannel(channel); + Assert.assertNotNull(channel.pipeline().get(OptionalSslHandler.class)); + Assert.assertNull(channel.pipeline().get(SslHandler.class)); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java index d6dd633..641c28c 100644 --- a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java +++ b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java @@ -41,6 +41,7 @@ import org.apache.cassandra.auth.AllowAllInternodeAuthenticator; import org.apache.cassandra.auth.IInternodeAuthenticator; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.AbstractEndpointSnitch; import org.apache.cassandra.locator.IEndpointSnitch; @@ -68,6 +69,7 @@ public class OutboundMessagingConnectionTest private EmbeddedChannel channel; private IEndpointSnitch snitch; + private ServerEncryptionOptions encryptionOptions; @BeforeClass public static void before() @@ -84,12 +86,14 @@ public class OutboundMessagingConnectionTest omc.setChannelWriter(ChannelWriter.create(channel, omc::handleMessageResult, Optional.empty())); snitch = DatabaseDescriptor.getEndpointSnitch(); + encryptionOptions = DatabaseDescriptor.getServerEncryptionOptions(); } @After public void tearDown() { DatabaseDescriptor.setEndpointSnitch(snitch); + DatabaseDescriptor.setServerEncryptionOptions(encryptionOptions); channel.finishAndReleaseAll(); } @@ -471,4 +475,45 @@ public class OutboundMessagingConnectionTest Assert.assertNotSame(omc.getConnectionId(), originalId); Assert.assertSame(NOT_READY, omc.getState()); } + + @Test + public void maybeUpdateConnectionId_NoEncryption() + { + OutboundConnectionIdentifier connectionId = omc.getConnectionId(); + int version = omc.getTargetVersion(); + omc.maybeUpdateConnectionId(); + Assert.assertEquals(connectionId, omc.getConnectionId()); + Assert.assertEquals(version, omc.getTargetVersion()); + } + + @Test + public void maybeUpdateConnectionId_SameVersion() + { + ServerEncryptionOptions encryptionOptions = new ServerEncryptionOptions(); + omc = new OutboundMessagingConnection(connectionId, encryptionOptions, Optional.empty(), new AllowAllInternodeAuthenticator()); + OutboundConnectionIdentifier connectionId = omc.getConnectionId(); + int version = omc.getTargetVersion(); + omc.maybeUpdateConnectionId(); + Assert.assertEquals(connectionId, omc.getConnectionId()); + Assert.assertEquals(version, omc.getTargetVersion()); + } + + @Test + public void maybeUpdateConnectionId_3_X_Version() + { + ServerEncryptionOptions encryptionOptions = new ServerEncryptionOptions(); + encryptionOptions.enabled = true; + encryptionOptions.internode_encryption = ServerEncryptionOptions.InternodeEncryption.all; + DatabaseDescriptor.setServerEncryptionOptions(encryptionOptions); + omc = new OutboundMessagingConnection(connectionId, encryptionOptions, Optional.empty(), new AllowAllInternodeAuthenticator()); + int peerVersion = MessagingService.VERSION_30; + MessagingService.instance().setVersion(connectionId.remote(), MessagingService.VERSION_30); + + OutboundConnectionIdentifier connectionId = omc.getConnectionId(); + omc.maybeUpdateConnectionId(); + Assert.assertNotEquals(connectionId, omc.getConnectionId()); + Assert.assertEquals(new InetSocketAddress(REMOTE_ADDR.getAddress(), DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().remoteAddress()); + Assert.assertEquals(new InetSocketAddress(REMOTE_ADDR.getAddress(), DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().connectionAddress()); + Assert.assertEquals(peerVersion, omc.getTargetVersion()); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java b/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java index 0c51eb7..4ade4ad 100644 --- a/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java +++ b/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java @@ -68,7 +68,7 @@ public class ProtocolBetaVersionTest extends CQLTester createTable("CREATE TABLE %s (pk int PRIMARY KEY, v int)"); assertTrue(betaVersion.isBeta()); // change to another beta version or remove test if no beta version - try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, true, new EncryptionOptions.ClientEncryptionOptions())) + try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, true, new EncryptionOptions())) { client.connect(false); for (int i = 0; i < 10; i++) @@ -103,7 +103,7 @@ public class ProtocolBetaVersionTest extends CQLTester } assertTrue(betaVersion.isBeta()); // change to another beta version or remove test if no beta version - try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, false, new EncryptionOptions.ClientEncryptionOptions())) + try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, false, new EncryptionOptions())) { client.connect(false); fail("Exception should have been thrown"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java index 817cb06..5b8067e 100644 --- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java +++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java @@ -29,6 +29,7 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.cql3.BatchQueryOptions; import org.apache.cassandra.cql3.CQLStatement; import org.apache.cassandra.cql3.QueryHandler; @@ -48,7 +49,6 @@ import org.apache.cassandra.transport.messages.QueryMessage; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.MD5Digest; -import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; public class MessagePayloadTest extends CQLTester @@ -127,7 +127,7 @@ public class MessagePayloadTest extends CQLTester nativePort, ProtocolVersion.V5, true, - new ClientEncryptionOptions()); + new EncryptionOptions()); try { client.connect(false); http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java index a6248bb..6acc500 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java @@ -23,7 +23,6 @@ package org.apache.cassandra.stress.settings; import java.io.Serializable; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,9 +38,9 @@ public class SettingsTransport implements Serializable this.options = options; } - public EncryptionOptions.ClientEncryptionOptions getEncryptionOptions() + public EncryptionOptions getEncryptionOptions() { - EncryptionOptions.ClientEncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions(); + EncryptionOptions encOptions = new EncryptionOptions(); if (options.trustStore.present()) { encOptions.enabled = true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java index a27b986..af35490 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java @@ -134,7 +134,7 @@ public class StressSettings implements Serializable if (client != null) return client; - EncryptionOptions.ClientEncryptionOptions encOptions = transport.getEncryptionOptions(); + EncryptionOptions encOptions = transport.getEncryptionOptions(); JavaDriverClient c = new JavaDriverClient(this, currentNode, port.nativePort, encOptions); c.connect(mode.compression()); if (keyspace != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java index d404653..4928cd2 100644 --- a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java +++ b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java @@ -50,7 +50,7 @@ public class JavaDriverClient public final int connectionsPerHost; private final ProtocolVersion protocolVersion; - private final EncryptionOptions.ClientEncryptionOptions encryptionOptions; + private final EncryptionOptions encryptionOptions; private Cluster cluster; private Session session; private final LoadBalancingPolicy loadBalancingPolicy; @@ -59,10 +59,10 @@ public class JavaDriverClient public JavaDriverClient(StressSettings settings, String host, int port) { - this(settings, host, port, new EncryptionOptions.ClientEncryptionOptions()); + this(settings, host, port, new EncryptionOptions()); } - public JavaDriverClient(StressSettings settings, String host, int port, EncryptionOptions.ClientEncryptionOptions encryptionOptions) + public JavaDriverClient(StressSettings settings, String host, int port, EncryptionOptions encryptionOptions) { this.protocolVersion = settings.mode.protocolVersion; this.host = host; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
