Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 17528910b -> a7895cfb2
Support for both encrypted and unencrypted native transport connections patch by Stefan Podkowinski; reviewed by Robert Stupp for CASSANDRA-9590 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7895cfb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7895cfb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7895cfb Branch: refs/heads/cassandra-3.0 Commit: a7895cfb2cdc667041402648bf922265cb0d34c3 Parents: 1752891 Author: Stefan Podkowinski <[email protected]> Authored: Sat Sep 5 18:23:10 2015 +0200 Committer: Robert Stupp <[email protected]> Committed: Sat Sep 5 18:23:10 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 4 + conf/cassandra.yaml | 8 + .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 25 +++ .../cassandra/service/CassandraDaemon.java | 38 +++- .../service/NativeTransportService.java | 199 +++++++++++++++++++ .../cassandra/service/StorageService.java | 11 +- .../org/apache/cassandra/transport/Server.java | 170 +++++++++------- .../org/apache/cassandra/cql3/CQLTester.java | 2 +- .../service/NativeTransportServiceTest.java | 193 ++++++++++++++++++ 10 files changed, 565 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d4e6771..afd45e5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,7 @@ +3.0.0-rc1 + * Support for both encrypted and unencrypted native transport connections (CASSANDRA-9590) + + 3.0.0-beta2 * Fix columns returned by AbstractBtreePartitions (CASSANDRA-10220) * Fix backward compatibility issue due to AbstractBounds serialization bug (CASSANDRA-9857) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 0f8b829..28caa1e 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -489,6 +489,14 @@ start_native_transport: true # port for the CQL native transport to listen for clients on # For security reasons, you should not expose this port to the internet. Firewall it if needed. native_transport_port: 9042 +# Enabling native transport encryption in client_encryption_options allows you to either use +# encryption for the standard port or to use a dedicated, additional port along with the unencrypted +# standard native_transport_port. +# Enabling client encryption and keeping native_transport_port_ssl disabled will use encryption +# for native_transport_port. Setting native_transport_port_ssl to a different value +# from native_transport_port will use encryption for native_transport_port_ssl while +# keeping native_transport_port unencrypted. +# native_transport_port_ssl: 9142 # The maximum threads for handling requests when the native transport is used. # This is similar to rpc_max_threads though the default differs slightly (and # there is no native_transport_min_threads, idle threads will always be stopped http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 22b09d3..164dab2 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -131,6 +131,7 @@ public class Config public Boolean start_native_transport = false; public Integer native_transport_port = 9042; + public Integer native_transport_port_ssl = null; public Integer native_transport_max_threads = 128; public Integer native_transport_max_frame_size_in_mb = 256; public volatile Long native_transport_max_concurrent_connections = -1L; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 2e68418..99cd563 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -680,6 +680,14 @@ public class DatabaseDescriptor conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2; else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb) throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false); + + // native transport encryption options + if (conf.native_transport_port_ssl != null + && conf.native_transport_port_ssl.intValue() != conf.native_transport_port.intValue() + && !conf.client_encryption_options.enabled) + { + throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false); + } } private static FileStore guessFileStore(String dir) throws IOException @@ -1341,6 +1349,23 @@ public class DatabaseDescriptor return Integer.parseInt(System.getProperty("cassandra.native_transport_port", conf.native_transport_port.toString())); } + @VisibleForTesting + public static void setNativeTransportPort(int port) + { + conf.native_transport_port = port; + } + + public static int getNativeTransportPortSSL() + { + return conf.native_transport_port_ssl == null ? getNativeTransportPort() : conf.native_transport_port_ssl; + } + + @VisibleForTesting + public static void setNativeTransportPortSSL(Integer port) + { + conf.native_transport_port_ssl = port; + } + public static Integer getNativeTransportMaxThreads() { return conf.native_transport_max_threads; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index c8b9677..230b46a 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -127,7 +127,7 @@ public class CassandraDaemon private static final CassandraDaemon instance = new CassandraDaemon(); public Server thriftServer; - public Server nativeServer; + private NativeTransportService nativeTransportService; private final boolean runManaged; protected final StartupChecks startupChecks; @@ -365,9 +365,7 @@ public class CassandraDaemon thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog); // Native transport - InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress(); - int nativePort = DatabaseDescriptor.getNativeTransportPort(); - nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort); + nativeTransportService = new NativeTransportService(); completeSetup(); } @@ -431,7 +429,8 @@ public class CassandraDaemon String nativeFlag = System.getProperty("cassandra.start_native_transport"); if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport())) { - nativeServer.start(); + startNativeTransport(); + StorageService.instance.setRpcReady(true); } else logger.info("Not starting native transport as requested. Use JMX (StorageService->startNativeTransport()) or nodetool (enablebinary) to start it"); @@ -453,9 +452,12 @@ public class CassandraDaemon // On linux, this doesn't entirely shut down Cassandra, just the RPC server. // jsvc takes care of taking the rest down logger.info("Cassandra shutting down..."); - thriftServer.stop(); - nativeServer.stop(); - + if (thriftServer != null) + thriftServer.stop(); + if (nativeTransportService != null) + nativeTransportService.destroy(); + StorageService.instance.setRpcReady(false); + // On windows, we need to stop the entire system as prunsrv doesn't have the jsvc hooks // We rely on the shutdown hook to drain the node if (FBUtilities.isWindows()) @@ -556,6 +558,26 @@ public class CassandraDaemon } } + public void startNativeTransport() + { + if (nativeTransportService == null) + throw new IllegalStateException("setup() must be called first for CassandraDaemon"); + else + nativeTransportService.start(); + } + + public void stopNativeTransport() + { + if (nativeTransportService != null) + nativeTransportService.stop(); + } + + public boolean isNativeTransportRunning() + { + return nativeTransportService != null ? nativeTransportService.isRunning() : false; + } + + /** * A convenience method to stop and destroy the daemon in one shot. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/service/NativeTransportService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/NativeTransportService.java b/src/java/org/apache/cassandra/service/NativeTransportService.java new file mode 100644 index 0000000..eff3a89 --- /dev/null +++ b/src/java/org/apache/cassandra/service/NativeTransportService.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import java.net.InetAddress; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.metrics.ClientMetrics; +import org.apache.cassandra.transport.RequestThreadPoolExecutor; +import org.apache.cassandra.transport.Server; + +/** + * Handles native transport server lifecycle and associated resources. Lazily initialized. + */ +public class NativeTransportService +{ + + private static final Logger logger = LoggerFactory.getLogger(NativeTransportService.class); + + private Collection<Server> servers = Collections.emptyList(); + + private boolean initialized = false; + private EventLoopGroup workerGroup; + private EventExecutor eventExecutorGroup; + + /** + * Creates netty thread pools and event loops. + */ + @VisibleForTesting + synchronized void initialize() + { + if (initialized) + return; + + // prepare netty resources + eventExecutorGroup = new RequestThreadPoolExecutor(); + + if (useEpoll()) + { + workerGroup = new EpollEventLoopGroup(); + logger.info("Netty using native Epoll event loop"); + } + else + { + workerGroup = new NioEventLoopGroup(); + logger.info("Netty using Java NIO event loop"); + } + + int nativePort = DatabaseDescriptor.getNativeTransportPort(); + int nativePortSSL = DatabaseDescriptor.getNativeTransportPortSSL(); + InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress(); + + org.apache.cassandra.transport.Server.Builder builder = new org.apache.cassandra.transport.Server.Builder() + .withEventExecutor(eventExecutorGroup) + .withEventLoopGroup(workerGroup) + .withHost(nativeAddr); + + if (!DatabaseDescriptor.getClientEncryptionOptions().enabled) + { + servers = Collections.singleton(builder.withSSL(false).withPort(nativePort).build()); + } + else + { + if (nativePort != nativePortSSL) + { + // user asked for dedicated ssl port for supporting both non-ssl and ssl connections + servers = Collections.unmodifiableList( + Arrays.asList( + builder.withSSL(false).withPort(nativePort).build(), + builder.withSSL(true).withPort(nativePortSSL).build() + ) + ); + } + else + { + // ssl only mode using configured native port + servers = Collections.singleton(builder.withSSL(true).withPort(nativePort).build()); + } + } + + // register metrics + ClientMetrics.instance.addCounter("connectedNativeClients", () -> + { + int ret = 0; + for (Server server : servers) + ret += server.getConnectedClients(); + return ret; + }); + + initialized = true; + } + + /** + * Starts native transport servers. + */ + public void start() + { + initialize(); + servers.forEach(Server::start); + } + + /** + * Stops currently running native transport servers. + */ + public void stop() + { + servers.forEach(Server::stop); + } + + /** + * Ultimately stops servers and closes all resources. + */ + public void destroy() + { + stop(); + servers = Collections.emptyList(); + + // shutdown executors used by netty for native transport server + Future<?> wgStop = workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS); + + try + { + wgStop.await(5000); + } + catch (InterruptedException e1) + { + Thread.currentThread().interrupt(); + } + + // shutdownGracefully not implemented yet in RequestThreadPoolExecutor + eventExecutorGroup.shutdown(); + } + + /** + * @return intend to use epoll bassed event looping + */ + public static boolean useEpoll() + { + final boolean enableEpoll = Boolean.valueOf(System.getProperty("cassandra.native.epoll.enabled", "true")); + return enableEpoll && Epoll.isAvailable(); + } + + /** + * @return true in case native transport server is running + */ + public boolean isRunning() + { + for (Server server : servers) + if (server.isRunning()) return true; + return false; + } + + @VisibleForTesting + EventLoopGroup getWorkerGroup() + { + return workerGroup; + } + + @VisibleForTesting + EventExecutor getEventExecutor() + { + return eventExecutorGroup; + } + + @VisibleForTesting + Collection<Server> getServers() + { + return servers; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a7ffc04..2d9bbec 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -405,10 +405,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { throw new IllegalStateException("No configured daemon"); } - + try { - daemon.nativeServer.start(); + daemon.startNativeTransport(); } catch (Exception e) { @@ -422,17 +422,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { throw new IllegalStateException("No configured daemon"); } - if (daemon.nativeServer != null) - daemon.nativeServer.stop(); + daemon.stopNativeTransport(); } public boolean isNativeTransportRunning() { - if ((daemon == null) || (daemon.nativeServer == null)) + if (daemon == null) { return false; } - return daemon.nativeServer.isRunning(); + return daemon.isNativeTransportRunning(); } public void stopTransports() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 72a1b60..cafc0ce 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -22,9 +22,8 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.EnumMap; -import java.util.Map; import java.util.List; -import java.util.concurrent.Callable; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLContext; @@ -35,7 +34,6 @@ import org.slf4j.LoggerFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; -import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.group.ChannelGroup; @@ -51,7 +49,6 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.metrics.ClientMetrics; import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.service.*; import org.apache.cassandra.transport.messages.EventMessage; @@ -64,7 +61,7 @@ public class Server implements CassandraDaemon.Server } private static final Logger logger = LoggerFactory.getLogger(Server.class); - private static final boolean enableEpoll = Boolean.valueOf(System.getProperty("cassandra.native.epoll.enabled", "true")); + private static final boolean useEpoll = NativeTransportService.useEpoll(); public static final int VERSION_1 = 1; public static final int VERSION_2 = 2; @@ -83,41 +80,32 @@ public class Server implements CassandraDaemon.Server }; public final InetSocketAddress socket; + public boolean useSSL = false; private final AtomicBoolean isRunning = new AtomicBoolean(false); private EventLoopGroup workerGroup; private EventExecutor eventExecutorGroup; - public Server(InetSocketAddress socket) + private Server (Builder builder) { - this.socket = socket; + this.socket = builder.getSocket(); + this.useSSL = builder.useSSL; + if (builder.workerGroup != null) + { + workerGroup = builder.workerGroup; + } + else + { + if (useEpoll) + workerGroup = new EpollEventLoopGroup(); + else + workerGroup = new NioEventLoopGroup(); + } + if (builder.eventExecutorGroup != null) + eventExecutorGroup = builder.eventExecutorGroup; EventNotifier notifier = new EventNotifier(this); StorageService.instance.register(notifier); MigrationManager.instance.register(notifier); - registerMetrics(); - } - - public Server(String hostname, int port) - { - this(new InetSocketAddress(hostname, port)); - } - - public Server(InetAddress host, int port) - { - this(new InetSocketAddress(host, port)); - } - - public Server(int port) - { - this(new InetSocketAddress(port)); - } - - public void start() - { - if(!isRunning()) - { - run(); - } } public void stop() @@ -131,35 +119,25 @@ public class Server implements CassandraDaemon.Server return isRunning.get(); } - private void run() + public synchronized void start() { - // Configure the server. - eventExecutorGroup = new RequestThreadPoolExecutor(); - - boolean hasEpoll = enableEpoll ? Epoll.isAvailable() : false; - if (hasEpoll) - { - workerGroup = new EpollEventLoopGroup(); - logger.info("Netty using native Epoll event loop"); - } - else - { - workerGroup = new NioEventLoopGroup(); - logger.info("Netty using Java NIO event loop"); - } + if(isRunning()) + return; + // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap() - .group(workerGroup) - .channel(hasEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class) + .channel(useEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_LINGER, 0) .childOption(ChannelOption.SO_KEEPALIVE, DatabaseDescriptor.getRpcKeepAlive()) .childOption(ChannelOption.ALLOCATOR, CBUtil.allocator) .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024) .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024); + if (workerGroup != null) + bootstrap = bootstrap.group(workerGroup); final EncryptionOptions.ClientEncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions(); - if (clientEnc.enabled) + if (this.useSSL) { logger.info("Enabling encrypted CQL connections between client and server"); bootstrap.childHandler(new SecureInitializer(this, clientEnc)); @@ -171,7 +149,7 @@ public class Server implements CassandraDaemon.Server // Bind and start to accept incoming connections. logger.info("Using Netty Version: {}", Version.identify().entrySet()); - logger.info("Starting listening for CQL clients on {}...", socket); + logger.info("Starting listening for CQL clients on {} ({})...", socket, this.useSSL ? "encrypted" : "unencrypted"); ChannelFuture bindFuture = bootstrap.bind(socket); if (!bindFuture.awaitUninterruptibly().isSuccess()) @@ -179,36 +157,83 @@ public class Server implements CassandraDaemon.Server connectionTracker.allChannels.add(bindFuture.channel()); isRunning.set(true); - - StorageService.instance.setRpcReady(true); } - private void registerMetrics() + public int getConnectedClients() { - ClientMetrics.instance.addCounter("connectedNativeClients", new Callable<Integer>() - { - @Override - public Integer call() throws Exception - { - return connectionTracker.getConnectedClients(); - } - }); + return connectionTracker.getConnectedClients(); } - + private void close() { // Close opened connections connectionTracker.closeAll(); - workerGroup.shutdownGracefully(); - workerGroup = null; - - eventExecutorGroup.shutdown(); - eventExecutorGroup = null; + logger.info("Stop listening for CQL clients"); - - StorageService.instance.setRpcReady(false); } + public static class Builder + { + private EventLoopGroup workerGroup; + private EventExecutor eventExecutorGroup; + private boolean useSSL = false; + private InetAddress hostAddr; + private int port = -1; + private InetSocketAddress socket; + + public Builder withSSL(boolean useSSL) + { + this.useSSL = useSSL; + return this; + } + + public Builder withEventLoopGroup(EventLoopGroup eventLoopGroup) + { + this.workerGroup = eventLoopGroup; + return this; + } + + public Builder withEventExecutor(EventExecutor eventExecutor) + { + this.eventExecutorGroup = eventExecutor; + return this; + } + + public Builder withHost(InetAddress host) + { + this.hostAddr = host; + this.socket = null; + return this; + } + + public Builder withPort(int port) + { + this.port = port; + this.socket = null; + return this; + } + + public Server build() + { + return new Server(this); + } + + private InetSocketAddress getSocket() + { + if (this.socket != null) + return this.socket; + else + { + if (this.port == -1) + throw new IllegalStateException("Missing port number"); + if (this.hostAddr != null) + this.socket = new InetSocketAddress(this.hostAddr, this.port); + else + throw new IllegalStateException("Missing host"); + return this.socket; + } + } + } public static class ConnectionTracker implements Connection.Tracker { @@ -253,7 +278,7 @@ public class Server implements CassandraDaemon.Server } } - private static class Initializer extends ChannelInitializer + private static class Initializer extends ChannelInitializer<Channel> { // Stateless handlers private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder(); @@ -294,7 +319,10 @@ public class Server implements CassandraDaemon.Server pipeline.addLast("messageDecoder", messageDecoder); pipeline.addLast("messageEncoder", messageEncoder); - pipeline.addLast(server.eventExecutorGroup, "executor", dispatcher); + if (server.eventExecutorGroup != null) + pipeline.addLast(server.eventExecutorGroup, "executor", dispatcher); + else + pipeline.addLast("executor", dispatcher); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 349975d..3d3729a 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -316,7 +316,7 @@ public abstract class CQLTester StorageService.instance.initServer(); SchemaLoader.startGossiper(); - server = new org.apache.cassandra.transport.Server(nativeAddr, nativePort); + server = new Server.Builder().withHost(nativeAddr).withPort(nativePort).build(); server.start(); for (int version = 1; version <= maxProtocolVersion; version++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7895cfb/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java b/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java new file mode 100644 index 0000000..7eb664f --- /dev/null +++ b/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import java.util.Arrays; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.collect.Sets; +import org.junit.After; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.transport.Server; +import org.apache.cassandra.utils.Pair; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class NativeTransportServiceTest +{ + + @After + public void resetConfig() + { + DatabaseDescriptor.getClientEncryptionOptions().enabled = false; + DatabaseDescriptor.setNativeTransportPortSSL(null); + } + + @Test + public void testServiceCanBeStopped() + { + withService((NativeTransportService service) -> { + service.stop(); + assertFalse(service.isRunning()); + }); + } + + @Test + public void testIgnoresStartOnAlreadyStarted() + { + withService((NativeTransportService service) -> { + service.start(); + service.start(); + service.start(); + }); + } + + @Test + public void testIgnoresStoppedOnAlreadyStopped() + { + withService((NativeTransportService service) -> { + service.stop(); + service.stop(); + service.stop(); + }); + } + + @Test + public void testDestroy() + { + withService((NativeTransportService service) -> { + Supplier<Boolean> allTerminated = () -> + service.getWorkerGroup().isShutdown() && service.getWorkerGroup().isTerminated() && + service.getEventExecutor().isShutdown() && service.getEventExecutor().isTerminated(); + assertFalse(allTerminated.get()); + service.destroy(); + assertTrue(allTerminated.get()); + }); + } + + @Test + public void testConcurrentStarts() + { + withService(NativeTransportService::start, false, 20); + } + + @Test + public void testConcurrentStops() + { + withService(NativeTransportService::stop, true, 20); + } + + @Test + public void testConcurrentDestroys() + { + withService(NativeTransportService::destroy, true, 20); + } + + @Test + public void testPlainDefaultPort() + { + // default plain settings: client encryption disabled and default native transport port + withService((NativeTransportService service) -> + { + assertEquals(1, service.getServers().size()); + Server server = service.getServers().iterator().next(); + assertFalse(server.useSSL); + assertEquals(server.socket.getPort(), DatabaseDescriptor.getNativeTransportPort()); + }); + } + + @Test + public void testSSLOnly() + { + // default ssl settings: client encryption enabled and default native transport port used for ssl only + DatabaseDescriptor.getClientEncryptionOptions().enabled = true; + + withService((NativeTransportService service) -> + { + service.initialize(); + assertEquals(1, service.getServers().size()); + Server server = service.getServers().iterator().next(); + assertTrue(server.useSSL); + assertEquals(server.socket.getPort(), DatabaseDescriptor.getNativeTransportPort()); + }, false, 1); + } + + @Test + public void testSSLWithNonSSL() + { + // ssl+non-ssl settings: client encryption enabled and additional ssl port specified + DatabaseDescriptor.getClientEncryptionOptions().enabled = true; + DatabaseDescriptor.setNativeTransportPortSSL(8432); + + withService((NativeTransportService service) -> + { + service.initialize(); + assertEquals(2, service.getServers().size()); + assertEquals( + Sets.newHashSet(Arrays.asList( + Pair.create(true, DatabaseDescriptor.getNativeTransportPortSSL()), + Pair.create(false, DatabaseDescriptor.getNativeTransportPort()) + ) + ), + service.getServers().stream().map((Server s) -> + Pair.create(s.useSSL, s.socket.getPort())).collect(Collectors.toSet()) + ); + }, false, 1); + } + + private static void withService(Consumer<NativeTransportService> f) + { + withService(f, true, 1); + } + + private static void withService(Consumer<NativeTransportService> f, boolean start, int concurrently) + { + NativeTransportService service = new NativeTransportService(); + assertFalse(service.isRunning()); + if (start) + { + service.start(); + assertTrue(service.isRunning()); + } + try + { + if (concurrently == 1) + { + f.accept(service); + } + else + { + IntStream.range(0, concurrently).parallel().map((int i) -> { + f.accept(service); + return 1; + }).sum(); + } + } + finally + { + service.stop(); + } + } +}
