http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/security/SSLFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/security/SSLFactory.java b/src/java/org/apache/cassandra/security/SSLFactory.java index 33c1ad6..3c1293f 100644 --- a/src/java/org/apache/cassandra/security/SSLFactory.java +++ b/src/java/org/apache/cassandra/security/SSLFactory.java @@ -17,63 +17,67 @@ */ package org.apache.cassandra.security; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.io.InputStream; + import java.io.IOException; +import java.io.InputStream; import java.net.InetAddress; -import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Paths; import java.security.KeyStore; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.Date; import java.util.Enumeration; import java.util.List; - +import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLParameters; -import javax.net.ssl.SSLServerSocket; import javax.net.ssl.SSLSocket; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; -import org.apache.cassandra.config.EncryptionOptions; -import org.apache.cassandra.io.util.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.OpenSsl; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.SupportedCipherSuiteFilter; +import io.netty.util.ReferenceCountUtil; +import org.apache.cassandra.config.EncryptionOptions; /** - * A Factory for providing and setting up Client and Server SSL wrapped - * Socket and ServerSocket + * A Factory for providing and setting up client {@link SSLSocket}s. Also provides + * methods for creating both JSSE {@link SSLContext} instances as well as netty {@link SslContext} instances. + * + * Netty {@link SslContext} instances are expensive to create (as well as to destroy) and consume a lof of resources + * (especially direct memory), but instances can be reused across connections (assuming the SSL params are the same). + * Hence we cache created instances in {@link #clientSslContext} and {@link #serverSslContext}. */ public final class SSLFactory { private static final Logger logger = LoggerFactory.getLogger(SSLFactory.class); - private static boolean checkedExpiry = false; - public static SSLServerSocket getServerSocket(EncryptionOptions options, InetAddress address, int port) throws IOException - { - SSLContext ctx = createSSLContext(options, true); - SSLServerSocket serverSocket = (SSLServerSocket)ctx.getServerSocketFactory().createServerSocket(); - try - { - serverSocket.setReuseAddress(true); - prepareSocket(serverSocket, options); - serverSocket.bind(new InetSocketAddress(address, port), 500); - return serverSocket; - } - catch (IllegalArgumentException | SecurityException | IOException e) - { - serverSocket.close(); - throw e; - } - } + @VisibleForTesting + static volatile boolean checkedExpiry = false; + + /** + * A cached reference of the {@link SslContext} for client-facing connections. + */ + private static final AtomicReference<SslContext> clientSslContext = new AtomicReference<>(); + + /** + * A cached reference of the {@link SslContext} for peer-to-peer, internode messaging connections. + */ + private static final AtomicReference<SslContext> serverSslContext = new AtomicReference<>(); /** Create a socket and connect */ public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port, InetAddress localAddress, int localPort) throws IOException @@ -109,37 +113,6 @@ public final class SSLFactory } } - /** Just create a socket */ - public static SSLSocket getSocket(EncryptionOptions options) throws IOException - { - SSLContext ctx = createSSLContext(options, true); - SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(); - try - { - prepareSocket(socket, options); - return socket; - } - catch (IllegalArgumentException e) - { - socket.close(); - throw e; - } - } - - /** Sets relevant socket options specified in encryption settings */ - private static void prepareSocket(SSLServerSocket serverSocket, EncryptionOptions options) - { - String[] suites = filterCipherSuites(serverSocket.getSupportedCipherSuites(), options.cipher_suites); - if(options.require_endpoint_verification) - { - SSLParameters sslParameters = serverSocket.getSSLParameters(); - sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); - serverSocket.setSSLParameters(sslParameters); - } - serverSocket.setEnabledCipherSuites(suites); - serverSocket.setNeedClientAuth(options.require_client_auth); - } - /** Sets relevant socket options specified in encryption settings */ private static void prepareSocket(SSLSocket socket, EncryptionOptions options) { @@ -153,28 +126,50 @@ public final class SSLFactory socket.setEnabledCipherSuites(suites); } + /** + * Create a JSSE {@link SSLContext}. + */ @SuppressWarnings("resource") public static SSLContext createSSLContext(EncryptionOptions options, boolean buildTruststore) throws IOException { - InputStream tsf = null; - InputStream ksf = null; - SSLContext ctx; + TrustManager[] trustManagers = null; + if (buildTruststore) + trustManagers = buildTrustManagerFactory(options).getTrustManagers(); + + KeyManagerFactory kmf = buildKeyManagerFactory(options); + try { - ctx = SSLContext.getInstance(options.protocol); - TrustManager[] trustManagers = null; + SSLContext ctx = SSLContext.getInstance(options.protocol); + ctx.init(kmf.getKeyManagers(), trustManagers, null); + return ctx; + } + catch (Exception e) + { + throw new IOException("Error creating/initializing the SSL Context", e); + } + } - if(buildTruststore) - { - tsf = Files.newInputStream(Paths.get(options.truststore)); - TrustManagerFactory tmf = TrustManagerFactory.getInstance(options.algorithm); - KeyStore ts = KeyStore.getInstance(options.store_type); - ts.load(tsf, options.truststore_password.toCharArray()); - tmf.init(ts); - trustManagers = tmf.getTrustManagers(); - } + static TrustManagerFactory buildTrustManagerFactory(EncryptionOptions options) throws IOException + { + try (InputStream tsf = Files.newInputStream(Paths.get(options.truststore))) + { + TrustManagerFactory tmf = TrustManagerFactory.getInstance(options.algorithm); + KeyStore ts = KeyStore.getInstance(options.store_type); + ts.load(tsf, options.truststore_password.toCharArray()); + tmf.init(ts); + return tmf; + } + catch (Exception e) + { + throw new IOException("failed to build trust manager store for secure connections", e); + } + } - ksf = Files.newInputStream(Paths.get((options.keystore))); + static KeyManagerFactory buildKeyManagerFactory(EncryptionOptions options) throws IOException + { + try (InputStream ksf = Files.newInputStream(Paths.get(options.keystore))) + { KeyManagerFactory kmf = KeyManagerFactory.getInstance(options.algorithm); KeyStore ks = KeyStore.getInstance(options.store_type); ks.load(ksf, options.keystore_password.toCharArray()); @@ -193,20 +188,12 @@ public final class SSLFactory checkedExpiry = true; } kmf.init(ks, options.keystore_password.toCharArray()); - - ctx.init(kmf.getKeyManagers(), trustManagers, null); - + return kmf; } catch (Exception e) { - throw new IOException("Error creating the initializing the SSL Context", e); + throw new IOException("failed to build trust manager store for secure connections", e); } - finally - { - FileUtils.closeQuietly(tsf); - FileUtils.closeQuietly(ksf); - } - return ctx; } public static String[] filterCipherSuites(String[] supported, String[] desired) @@ -223,4 +210,65 @@ public final class SSLFactory } return ret; } + + /** + * get a netty {@link SslContext} instance + */ + public static SslContext getSslContext(EncryptionOptions options, boolean buildTruststore, boolean forServer) throws IOException + { + return getSslContext(options, buildTruststore, forServer, OpenSsl.isAvailable()); + } + + /** + * Get a netty {@link SslContext} instance. + */ + @VisibleForTesting + static SslContext getSslContext(EncryptionOptions options, boolean buildTruststore, boolean forServer, boolean useOpenSsl) throws IOException + { + if (forServer && serverSslContext.get() != null) + return serverSslContext.get(); + if (!forServer && clientSslContext.get() != null) + return clientSslContext.get(); + + /* + There is a case where the netty/openssl combo might not support using KeyManagerFactory. specifically, + I've seen this with the netty-tcnative dynamic openssl implementation. using the netty-tcnative static-boringssl + works fine with KeyManagerFactory. If we want to support all of the netty-tcnative options, we would need + to fall back to passing in a file reference for both a x509 and PKCS#8 private key file in PEM format (see + {@link SslContextBuilder#forServer(File, File, String)}). However, we are not supporting that now to keep + the config/yaml API simple. + */ + KeyManagerFactory kmf = null; + if (forServer || options.require_client_auth) + kmf = buildKeyManagerFactory(options); + + SslContextBuilder builder; + if (forServer) + { + builder = SslContextBuilder.forServer(kmf); + builder.clientAuth(options.require_client_auth ? ClientAuth.REQUIRE : ClientAuth.NONE); + } + else + { + builder = SslContextBuilder.forClient().keyManager(kmf); + } + + builder.sslProvider(useOpenSsl ? SslProvider.OPENSSL : SslProvider.JDK); + + // only set the cipher suites if the opertor has explicity configured values for it; else, use the default + // for each ssl implemention (jdk or openssl) + if (options.cipher_suites != null && options.cipher_suites.length > 0) + builder.ciphers(Arrays.asList(options.cipher_suites), SupportedCipherSuiteFilter.INSTANCE); + + if (buildTruststore) + builder.trustManager(buildTrustManagerFactory(options)); + + SslContext ctx = builder.build(); + AtomicReference<SslContext> ref = forServer ? serverSslContext : clientSslContext; + if (ref.compareAndSet(null, ctx)) + return ctx; + + ReferenceCountUtil.release(ctx); + return ref.get(); + } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java index a2ad66c..d88d63c 100644 --- a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java +++ b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java @@ -19,13 +19,18 @@ package org.apache.cassandra.streaming; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.Socket; +import java.nio.channels.SocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.net.OutboundTcpConnectionPool; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.security.SSLFactory; +import org.apache.cassandra.utils.FBUtilities; public class DefaultConnectionFactory implements StreamConnectionFactory { @@ -47,20 +52,15 @@ public class DefaultConnectionFactory implements StreamConnectionFactory int attempts = 0; while (true) { - Socket socket = null; try { - socket = OutboundTcpConnectionPool.newSocket(peer); + Socket socket = newSocket(peer); socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout()); socket.setKeepAlive(true); return socket; } catch (IOException e) { - if (socket != null) - { - socket.close(); - } if (++attempts >= MAX_CONNECT_ATTEMPTS) throw e; @@ -77,4 +77,21 @@ public class DefaultConnectionFactory implements StreamConnectionFactory } } } + + // TODO this is deliberately copied from (the now former) OutboundTcpConnectionPool, for CASSANDRA-8457. + // to be replaced in CASSANDRA-12229 (make streaming use 8457) + public static Socket newSocket(InetAddress endpoint) throws IOException + { + // zero means 'bind on any available port.' + if (MessagingService.isEncryptedConnection(endpoint)) + { + return SSLFactory.getSocket(DatabaseDescriptor.getServerEncryptionOptions(), endpoint, DatabaseDescriptor.getSSLStoragePort()); + } + else + { + SocketChannel channel = SocketChannel.open(); + channel.connect(new InetSocketAddress(endpoint, DatabaseDescriptor.getStoragePort())); + return channel.socket(); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/tracing/Tracing.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java index 33e1967..4cdddba 100644 --- a/src/java/org/apache/cassandra/tracing/Tracing.java +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@ -316,8 +316,7 @@ public abstract class Tracing implements ExecutorLocal<TraceState> } /** - * Called from {@link org.apache.cassandra.net.OutboundTcpConnection} for non-local traces (traces - * that are not initiated by local node == coordinator). + * Called for non-local traces (traces that are not initiated by local node == coordinator). */ public abstract void trace(ByteBuffer sessionId, String message, int ttl); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/tracing/TracingImpl.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TracingImpl.java b/src/java/org/apache/cassandra/tracing/TracingImpl.java index d774abb..789216e 100644 --- a/src/java/org/apache/cassandra/tracing/TracingImpl.java +++ b/src/java/org/apache/cassandra/tracing/TracingImpl.java @@ -99,8 +99,7 @@ class TracingImpl extends Tracing } /** - * Called from {@link org.apache.cassandra.net.OutboundTcpConnection} for non-local traces (traces - * that are not initiated by local node == coordinator). + * Called for non-local traces (traces that are not initiated by local node == coordinator). */ public void trace(final ByteBuffer sessionId, final String message, final int ttl) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 881ee81..1afe910 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -610,7 +610,9 @@ public abstract class Message message = "Unexpected exception during request; channel = <unprintable>"; } - if (!alwaysLogAtError && exception instanceof IOException) + // netty wraps SSL errors in a CodecExcpetion + boolean isIOException = exception instanceof IOException || (exception.getCause() instanceof IOException); + if (!alwaysLogAtError && isIOException) { if (ioExceptionsAtDebugLevel.contains(exception.getMessage())) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 28f99e8..9408a3a 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.*; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; @@ -40,6 +41,7 @@ import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.util.Version; import io.netty.util.concurrent.EventExecutor; @@ -343,31 +345,18 @@ public class Server implements CassandraDaemon.Server protected abstract static class AbstractSecureIntializer extends Initializer { - private final SSLContext sslContext; private final EncryptionOptions encryptionOptions; protected AbstractSecureIntializer(Server server, EncryptionOptions encryptionOptions) { super(server); this.encryptionOptions = encryptionOptions; - try - { - this.sslContext = SSLFactory.createSSLContext(encryptionOptions, encryptionOptions.require_client_auth); - } - catch (IOException e) - { - throw new RuntimeException("Failed to setup secure pipeline", e); - } } - protected final SslHandler createSslHandler() + protected final SslHandler createSslHandler(ByteBufAllocator allocator) throws IOException { - SSLEngine sslEngine = sslContext.createSSLEngine(); - sslEngine.setUseClientMode(false); - String[] suites = SSLFactory.filterCipherSuites(sslEngine.getSupportedCipherSuites(), encryptionOptions.cipher_suites); - sslEngine.setEnabledCipherSuites(suites); - sslEngine.setNeedClientAuth(encryptionOptions.require_client_auth); - return new SslHandler(sslEngine); + SslContext sslContext = SSLFactory.getSslContext(encryptionOptions, encryptionOptions.require_client_auth, true); + return sslContext.newHandler(allocator); } } @@ -396,7 +385,7 @@ public class Server implements CassandraDaemon.Server { // Connection uses SSL/TLS, replace the detection handler with a SslHandler and so use // encryption. - SslHandler sslHandler = createSslHandler(); + SslHandler sslHandler = createSslHandler(channel.alloc()); channelHandlerContext.pipeline().replace(this, "ssl", sslHandler); } else @@ -419,7 +408,7 @@ public class Server implements CassandraDaemon.Server protected void initChannel(Channel channel) throws Exception { - SslHandler sslHandler = createSslHandler(); + SslHandler sslHandler = createSslHandler(channel.alloc()); super.initChannel(channel); channel.pipeline().addFirst("ssl", sslHandler); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/transport/SimpleClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index 13cd9bd..c72d6e9 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -28,8 +28,6 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +38,7 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.handler.ssl.SslContext; import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.Slf4JLoggerFactory; import org.apache.cassandra.cql3.QueryOptions; @@ -58,7 +57,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.handler.ssl.SslHandler; import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions; public class SimpleClient implements Closeable @@ -287,21 +285,11 @@ public class SimpleClient implements Closeable private class SecureInitializer extends Initializer { - private final SSLContext sslContext; - - public SecureInitializer() throws IOException - { - this.sslContext = SSLFactory.createSSLContext(encryptionOptions, true); - } - protected void initChannel(Channel channel) throws Exception { super.initChannel(channel); - SSLEngine sslEngine = sslContext.createSSLEngine(); - sslEngine.setUseClientMode(true); - String[] suites = SSLFactory.filterCipherSuites(sslEngine.getSupportedCipherSuites(), encryptionOptions.cipher_suites); - sslEngine.setEnabledCipherSuites(suites); - channel.pipeline().addFirst("ssl", new SslHandler(sslEngine)); + SslContext sslContext = SSLFactory.getSslContext(encryptionOptions, encryptionOptions.require_client_auth, true); + channel.pipeline().addFirst("ssl", sslContext.newHandler(channel.alloc())); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/utils/CoalescingStrategies.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java index 9f3b118..6ff91e3 100644 --- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java +++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java @@ -17,13 +17,10 @@ */ package org.apache.cassandra.utils; -import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.FileUtils; - import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.io.RandomAccessFile; @@ -32,19 +29,17 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel.MapMode; import java.util.Arrays; import java.util.Collection; -import java.util.List; -import java.util.concurrent.BlockingQueue; +import java.util.Optional; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; import java.util.Locale; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +/** + * Groups strategies to coalesce messages. + */ public class CoalescingStrategies { - static protected final Logger logger = LoggerFactory.getLogger(CoalescingStrategies.class); - /* * Log debug information at info level about what the average is and when coalescing is enabled/disabled */ @@ -54,6 +49,8 @@ public class CoalescingStrategies private static final String DEBUG_COALESCING_PATH_PROPERTY = Config.PROPERTY_PREFIX + "coalescing_debug_path"; private static final String DEBUG_COALESCING_PATH = System.getProperty(DEBUG_COALESCING_PATH_PROPERTY, "/tmp/coleascing_debug"); + public enum Strategy { MOVINGAVERAGE, FIXED, TIMEHORIZON, DISABLED } + static { if (DEBUG_COALESCING) @@ -68,98 +65,83 @@ public class CoalescingStrategies } } - @VisibleForTesting - interface Clock - { - long nanoTime(); - } - - @VisibleForTesting - static Clock CLOCK = new Clock() - { - public long nanoTime() - { - return System.nanoTime(); - } - }; - public static interface Coalescable { long timestampNanos(); } @VisibleForTesting - static void parkLoop(long nanos) - { - long now = System.nanoTime(); - final long timer = now + nanos; - // We shouldn't loop if it's within a few % of the target sleep time if on a second iteration. - // See CASSANDRA-8692. - final long limit = timer - nanos / 16; - do - { - LockSupport.parkNanos(timer - now); - now = System.nanoTime(); - } - while (now < limit); - } - - private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker) + static long determineCoalescingTime(long averageGap, long maxCoalesceWindow) { - // Do not sleep if there are still items in the backlog (CASSANDRA-13090). - if (messages >= DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages()) - return false; + // Don't bother waiting at all if we're unlikely to get any new message within our max window + if (averageGap > maxCoalesceWindow) + return -1; - // only sleep if we can expect to double the number of messages we're sending in the time interval - long sleep = messages * averageGap; - if (sleep <= 0 || sleep > maxCoalesceWindow) - return false; + // avoid the degenerate case of zero (very unlikely, but let's be safe) + if (averageGap <= 0) + return maxCoalesceWindow; // assume we receive as many messages as we expect; apply the same logic to the future batch: // expect twice as many messages to consider sleeping for "another" interval; this basically translates - // to doubling our sleep period until we exceed our max sleep window + // to doubling our sleep period until we exceed our max sleep window. + long sleep = averageGap; while (sleep * 2 < maxCoalesceWindow) sleep *= 2; - parker.park(sleep); - return true; + return sleep; } - public static abstract class CoalescingStrategy + /** + * A coalescing strategy, that decides when to coalesce messages. + * <p> + * The general principle is that, when asked, the strategy returns the time delay we want to wait for more messages + * to arrive before sending so message can be coalesced. For that, the strategy must be fed new messages through + * the {@link #newArrival(Coalescable)} method (the only assumption we make on messages is that they have an associated + * timestamp). The strategy can then be queried for the time to wait for coalescing through + * {@link #currentCoalescingTimeNanos()}. + * <p> + * Note that it is expected that a call {@link #currentCoalescingTimeNanos()} will come just after a call to + * {@link #newArrival(Coalescable))}, as the intent of the value returned by the former method is "Given a new message, how much + * time should I wait for more messages to arrive and be coalesced with that message". But both calls are separated + * as one may not want to call {@link #currentCoalescingTimeNanos()} after every call to {@link #newArrival(Coalescable)} + * and we thus save processing. How arrivals influence the coalescing time is however entirely up to the strategy and some + * strategy may ignore arrivals completely and return a constant coalescing time. + */ + public interface CoalescingStrategy + { + /** + * Inform the strategy of a new message to consider. + * + * @param message the message to consider. + */ + void newArrival(Coalescable message); + + /** + * The current time to wait for the purpose of coalescing messages. + * + * @return the coalescing time. A negative value can be returned if no coalescing should be done (which can be a + * transient thing). + */ + long currentCoalescingTimeNanos(); + } + + public static abstract class AbstractCoalescingStrategy implements CoalescingStrategy { - protected final Parker parker; protected final Logger logger; protected volatile boolean shouldLogAverage = false; protected final ByteBuffer logBuffer; private RandomAccessFile ras; private final String displayName; - protected CoalescingStrategy(Parker parker, Logger logger, String displayName) + protected AbstractCoalescingStrategy(Logger logger, String displayName) { - this.parker = parker; this.logger = logger; this.displayName = displayName; - if (DEBUG_COALESCING) - { - NamedThreadFactory.createThread(() -> - { - while (true) - { - try - { - Thread.sleep(5000); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } - shouldLogAverage = true; - } - }, displayName + " debug thread").start(); - } + RandomAccessFile rasTemp = null; ByteBuffer logBufferTemp = null; if (DEBUG_COALESCING) { + ScheduledExecutors.scheduledFastTasks.scheduleWithFixedDelay(() -> shouldLogAverage = true, 5, 5, TimeUnit.SECONDS); try { File outFile = File.createTempFile("coalescing_" + this.displayName + "_", ".log", new File(DEBUG_COALESCING_PATH)); @@ -214,44 +196,10 @@ public class CoalescingStrategies } } } - - /** - * Drain from the input blocking queue to the output list up to maxItems elements. - * - * The coalescing strategy may choose to park the current thread if it thinks it will - * be able to produce an output list with more elements. - * - * @param input Blocking queue to retrieve elements from - * @param out Output list to place retrieved elements in. Must be empty. - * @param maxItems Maximum number of elements to place in the output list - */ - public <C extends Coalescable> void coalesce(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException - { - Preconditions.checkArgument(out.isEmpty(), "out list should be empty"); - coalesceInternal(input, out, maxItems); - } - - protected abstract <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException; - } @VisibleForTesting - interface Parker - { - void park(long nanos); - } - - private static final Parker PARKER = new Parker() - { - @Override - public void park(long nanos) - { - parkLoop(nanos); - } - }; - - @VisibleForTesting - static class TimeHorizonMovingAverageCoalescingStrategy extends CoalescingStrategy + static class TimeHorizonMovingAverageCoalescingStrategy extends AbstractCoalescingStrategy { // for now we'll just use 64ms per bucket; this can be made configurable, but results in ~1s for 16 samples private static final int INDEX_SHIFT = 26; @@ -261,7 +209,7 @@ public class CoalescingStrategies private static final long MEASURED_INTERVAL = BUCKET_INTERVAL * (BUCKET_COUNT - 1); // the minimum timestamp we will now accept updates for; only moves forwards, never backwards - private long epoch = CLOCK.nanoTime(); + private long epoch; // the buckets, each following on from epoch; the measurements run from ix(epoch) to ix(epoch - 1) // ix(epoch-1) is a partial result, that is never actually part of the calculation, and most updates // are expected to hit this bucket @@ -269,31 +217,12 @@ public class CoalescingStrategies private long sum = 0; private final long maxCoalesceWindow; - public TimeHorizonMovingAverageCoalescingStrategy(int maxCoalesceWindow, Parker parker, Logger logger, String displayName) + public TimeHorizonMovingAverageCoalescingStrategy(int maxCoalesceWindow, Logger logger, String displayName, long initialEpoch) { - super(parker, logger, displayName); + super(logger, displayName); this.maxCoalesceWindow = TimeUnit.MICROSECONDS.toNanos(maxCoalesceWindow); sum = 0; - } - - private void logSample(long nanos) - { - debugTimestamp(nanos); - long epoch = this.epoch; - long delta = nanos - epoch; - if (delta < 0) - // have to simply ignore, but would be a bit crazy to get such reordering - return; - - if (delta > INTERVAL) - epoch = rollepoch(delta, epoch, nanos); - - int ix = ix(nanos); - samples[ix]++; - - // if we've updated an old bucket, we need to update the sum to match - if (ix != ix(epoch - 1)) - sum++; + epoch = initialEpoch; } private long averageGap() @@ -304,7 +233,7 @@ public class CoalescingStrategies } // this sample extends past the end of the range we cover, so rollover - private long rollepoch(long delta, long epoch, long nanos) + private long rollEpoch(long delta, long epoch, long nanos) { if (delta > 2 * INTERVAL) { @@ -341,30 +270,32 @@ public class CoalescingStrategies return (int) ((nanos >>> INDEX_SHIFT) & 15); } - @Override - protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException + public void newArrival(Coalescable message) { - if (input.drainTo(out, maxItems) == 0) - { - out.add(input.take()); - input.drainTo(out, maxItems - out.size()); - } + final long timestamp = message.timestampNanos(); + debugTimestamp(timestamp); + long epoch = this.epoch; + long delta = timestamp - epoch; + if (delta < 0) + // have to simply ignore, but would be a bit unlucky to get such reordering + return; - for (Coalescable qm : out) - logSample(qm.timestampNanos()); + if (delta > INTERVAL) + epoch = rollEpoch(delta, epoch, timestamp); + + int ix = ix(timestamp); + samples[ix]++; + // if we've updated an old bucket, we need to update the sum to match + if (ix != ix(epoch - 1)) + sum++; + } + + public long currentCoalescingTimeNanos() + { long averageGap = averageGap(); debugGap(averageGap); - - int count = out.size(); - if (maybeSleep(count, averageGap, maxCoalesceWindow, parker)) - { - input.drainTo(out, maxItems - out.size()); - int prevCount = count; - count = out.size(); - for (int i = prevCount; i < count; i++) - logSample(out.get(i).timestampNanos()); - } + return determineCoalescingTime(averageGap, maxCoalesceWindow); } @Override @@ -374,25 +305,27 @@ public class CoalescingStrategies } } - /* + /** * Start coalescing by sleeping if the moving average is < the requested window. * The actual time spent waiting to coalesce will be the min( window, moving average * 2) * The actual amount of time spent waiting can be greater then the window. For instance * observed time spent coalescing was 400 microseconds with the window set to 200 in one benchmark. */ @VisibleForTesting - static class MovingAverageCoalescingStrategy extends CoalescingStrategy + static class MovingAverageCoalescingStrategy extends AbstractCoalescingStrategy { - private final int samples[] = new int[16]; + static final int SAMPLE_SIZE = 16; + private final int samples[] = new int[SAMPLE_SIZE]; + private final long maxCoalesceWindow; + private long lastSample = 0; private int index = 0; private long sum = 0; + private long currentGap; - private final long maxCoalesceWindow; - - public MovingAverageCoalescingStrategy(int maxCoalesceWindow, Parker parker, Logger logger, String displayName) + public MovingAverageCoalescingStrategy(int maxCoalesceWindow, Logger logger, String displayName) { - super(parker, logger, displayName); + super(logger, displayName); this.maxCoalesceWindow = TimeUnit.MICROSECONDS.toNanos(maxCoalesceWindow); for (int ii = 0; ii < samples.length; ii++) samples[ii] = Integer.MAX_VALUE; @@ -406,42 +339,29 @@ public class CoalescingStrategies samples[index] = value; index++; index = index & ((1 << 4) - 1); - return sum / 16; + return sum / SAMPLE_SIZE; } - private long notifyOfSample(long sample) + public void newArrival(Coalescable message) { - debugTimestamp(sample); - if (sample > lastSample) + final long timestamp = message.timestampNanos(); + debugTimestamp(timestamp); + if (timestamp > lastSample) { - final int delta = (int)(Math.min(Integer.MAX_VALUE, sample - lastSample)); - lastSample = sample; - return logSample(delta); + final int delta = (int)(Math.min(Integer.MAX_VALUE, timestamp - lastSample)); + lastSample = timestamp; + currentGap = logSample(delta); } else { - return logSample(1); + currentGap = logSample(1); } } - @Override - protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException + public long currentCoalescingTimeNanos() { - if (input.drainTo(out, maxItems) == 0) - { - out.add(input.take()); - input.drainTo(out, maxItems - out.size()); - } - - long average = notifyOfSample(out.get(0).timestampNanos()); - debugGap(average); - - if (maybeSleep(out.size(), average, maxCoalesceWindow, parker)) { - input.drainTo(out, maxItems - out.size()); - } - - for (int ii = 1; ii < out.size(); ii++) - notifyOfSample(out.get(ii).timestampNanos()); + debugGap(currentGap); + return determineCoalescingTime(currentGap, maxCoalesceWindow); } @Override @@ -451,35 +371,28 @@ public class CoalescingStrategies } } - /* + /** * A fixed strategy as a backup in case MovingAverage or TimeHorizongMovingAverage fails in some scenario */ @VisibleForTesting - static class FixedCoalescingStrategy extends CoalescingStrategy + static class FixedCoalescingStrategy extends AbstractCoalescingStrategy { private final long coalesceWindow; - public FixedCoalescingStrategy(int coalesceWindowMicros, Parker parker, Logger logger, String displayName) + public FixedCoalescingStrategy(int coalesceWindowMicros, Logger logger, String displayName) { - super(parker, logger, displayName); + super(logger, displayName); coalesceWindow = TimeUnit.MICROSECONDS.toNanos(coalesceWindowMicros); } - @Override - protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException + public void newArrival(Coalescable message) { - int enough = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages(); + debugTimestamp(message.timestampNanos()); + } - if (input.drainTo(out, maxItems) == 0) - { - out.add(input.take()); - input.drainTo(out, maxItems - out.size()); - if (out.size() < enough) { - parker.park(coalesceWindow); - input.drainTo(out, maxItems - out.size()); - } - } - debugTimestamps(out); + public long currentCoalescingTimeNanos() + { + return coalesceWindow; } @Override @@ -489,84 +402,43 @@ public class CoalescingStrategies } } - /* - * A coalesscing strategy that just returns all currently available elements - */ - @VisibleForTesting - static class DisabledCoalescingStrategy extends CoalescingStrategy + public static Optional<CoalescingStrategy> newCoalescingStrategy(String strategy, int coalesceWindow, Logger logger, String displayName) { + String strategyCleaned = strategy.trim().toUpperCase(Locale.ENGLISH); - public DisabledCoalescingStrategy(int coalesceWindowMicros, Parker parker, Logger logger, String displayName) - { - super(parker, logger, displayName); - } - - @Override - protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException + try { - if (input.drainTo(out, maxItems) == 0) + switch (Enum.valueOf(Strategy.class, strategyCleaned)) { - out.add(input.take()); - input.drainTo(out, maxItems - 1); + case MOVINGAVERAGE: + return Optional.of(new MovingAverageCoalescingStrategy(coalesceWindow, logger, displayName)); + case FIXED: + return Optional.of(new FixedCoalescingStrategy(coalesceWindow, logger, displayName)); + case TIMEHORIZON: + long initialEpoch = System.nanoTime(); + return Optional.of(new TimeHorizonMovingAverageCoalescingStrategy(coalesceWindow, logger, displayName, initialEpoch)); + case DISABLED: + return Optional.empty(); + default: + throw new IllegalArgumentException("supported coalese strategy"); } - debugTimestamps(out); } - - @Override - public String toString() + catch (IllegalArgumentException iae) { - return "Disabled"; - } - } - - @VisibleForTesting - static CoalescingStrategy newCoalescingStrategy(String strategy, - int coalesceWindow, - Parker parker, - Logger logger, - String displayName) - { - String classname = null; - String strategyCleaned = strategy.trim().toUpperCase(Locale.ENGLISH); - switch(strategyCleaned) - { - case "MOVINGAVERAGE": - classname = MovingAverageCoalescingStrategy.class.getName(); - break; - case "FIXED": - classname = FixedCoalescingStrategy.class.getName(); - break; - case "TIMEHORIZON": - classname = TimeHorizonMovingAverageCoalescingStrategy.class.getName(); - break; - case "DISABLED": - classname = DisabledCoalescingStrategy.class.getName(); - break; - default: - classname = strategy; - } + try + { + Class<?> clazz = Class.forName(strategy); - try - { - Class<?> clazz = Class.forName(classname); + if (!CoalescingStrategy.class.isAssignableFrom(clazz)) + throw new RuntimeException(strategy + " is not an instance of CoalescingStrategy"); - if (!CoalescingStrategy.class.isAssignableFrom(clazz)) + Constructor<?> constructor = clazz.getConstructor(int.class, Logger.class, String.class); + return Optional.of((CoalescingStrategy)constructor.newInstance(coalesceWindow, logger, displayName)); + } + catch (Exception e) { - throw new RuntimeException(classname + " is not an instance of CoalescingStrategy"); + throw new RuntimeException(e); } - - Constructor<?> constructor = clazz.getConstructor(int.class, Parker.class, Logger.class, String.class); - - return (CoalescingStrategy)constructor.newInstance(coalesceWindow, parker, logger, displayName); - } - catch (Exception e) - { - throw new RuntimeException(e); } } - - public static CoalescingStrategy newCoalescingStrategy(String strategy, int coalesceWindow, Logger logger, String displayName) - { - return newCoalescingStrategy(strategy, coalesceWindow, PARKER, logger, displayName); - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index 3fa64b3..58c3371 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -147,6 +147,13 @@ public class FBUtilities return broadcastInetAddress; } + /** + * <b>THIS IS FOR TESTING ONLY!!</b> + */ + public static void setBroadcastInetAddress(InetAddress addr) + { + broadcastInetAddress = addr; + } public static InetAddress getBroadcastRpcAddress() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/utils/NativeLibrary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/NativeLibrary.java b/src/java/org/apache/cassandra/utils/NativeLibrary.java index 735d51a..7d54791 100644 --- a/src/java/org/apache/cassandra/utils/NativeLibrary.java +++ b/src/java/org/apache/cassandra/utils/NativeLibrary.java @@ -49,7 +49,7 @@ public final class NativeLibrary OTHER; } - private static final OSType osType; + public static final OSType osType; private static final int MCL_CURRENT; private static final int MCL_FUTURE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/test/conf/cassandra_ssl_test.keystore ---------------------------------------------------------------------- diff --git a/test/conf/cassandra_ssl_test.keystore b/test/conf/cassandra_ssl_test.keystore new file mode 100644 index 0000000..8b2b218 Binary files /dev/null and b/test/conf/cassandra_ssl_test.keystore differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/test/conf/cassandra_ssl_test.truststore ---------------------------------------------------------------------- diff --git a/test/conf/cassandra_ssl_test.truststore b/test/conf/cassandra_ssl_test.truststore new file mode 100644 index 0000000..49cf332 Binary files /dev/null and b/test/conf/cassandra_ssl_test.truststore differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/test/unit/org/apache/cassandra/db/ReadCommandTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java index 249d780..056089e 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java @@ -18,10 +18,13 @@ package org.apache.cassandra.db; +import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -46,6 +49,7 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.TableMetadata; @@ -306,4 +310,33 @@ public class ReadCommandTest assertEquals(expectedRows.length, i); } } + + public void serializerTest() throws IOException + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2); + + new RowUpdateBuilder(cfs.metadata.get(), 0, ByteBufferUtil.bytes("key")) + .clustering("dd") + .add("a", ByteBufferUtil.bytes("abcd")) + .build() + .apply(); + + ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("dd").build(); + int messagingVersion = MessagingService.current_version; + long size = ReadCommand.serializer.serializedSize(readCommand, messagingVersion); + + FakeOutputStream out = new FakeOutputStream(); + ReadCommand.serializer.serialize(readCommand, new WrappedDataOutputStreamPlus(out), messagingVersion); + Assert.assertEquals(size, out.count); + } + + static class FakeOutputStream extends OutputStream + { + long count; + + public void write(int b) throws IOException + { + count++; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java index 5e99523..09973a8 100644 --- a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java @@ -21,12 +21,10 @@ package org.apache.cassandra.locator; import java.io.IOException; import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.EnumMap; import java.util.Map; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -37,8 +35,6 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.net.OutboundTcpConnectionPool; import org.apache.cassandra.service.StorageService; import static org.junit.Assert.assertEquals; @@ -102,22 +98,6 @@ public class EC2SnitchTest assertEquals("2d", snitch.getRack(local)); } - @Test - public void testEc2MRSnitch() throws UnknownHostException - { - InetAddress me = InetAddress.getByName("127.0.0.2"); - InetAddress com_ip = InetAddress.getByName("127.0.0.3"); - - OutboundTcpConnectionPool pool = MessagingService.instance().getConnectionPool(me); - Assert.assertEquals(me, pool.endPoint()); - pool.reset(com_ip); - Assert.assertEquals(com_ip, pool.endPoint()); - - MessagingService.instance().destroyConnectionPool(me); - pool = MessagingService.instance().getConnectionPool(me); - Assert.assertEquals(com_ip, pool.endPoint()); - } - @AfterClass public static void tearDown() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/test/unit/org/apache/cassandra/net/MessagingServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java index 7cb3cfd..a082d56 100644 --- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -20,10 +20,9 @@ */ package org.apache.cassandra.net; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; @@ -31,23 +30,31 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.regex.*; import java.util.regex.Matcher; import com.google.common.collect.Iterables; + import com.codahale.metrics.Timer; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; import org.apache.cassandra.auth.IInternodeAuthenticator; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.monitoring.ApproximateTime; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; -import org.apache.cassandra.io.util.DataOutputStreamPlus; -import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; +import org.apache.cassandra.net.MessagingService.ServerChannel; +import org.apache.cassandra.net.async.NettyFactory; +import org.apache.cassandra.net.async.OutboundConnectionIdentifier; +import org.apache.cassandra.net.async.OutboundConnectionParams; +import org.apache.cassandra.net.async.OutboundMessagingPool; +import org.apache.cassandra.utils.FBUtilities; import org.caffinitas.ohc.histo.EstimatedHistogram; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -93,6 +100,12 @@ public class MessagingServiceTest messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.3")); } + @After + public void replaceAuthenticator() + { + DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator); + } + @Test public void testDroppedMessages() { @@ -197,7 +210,7 @@ public class MessagingServiceTest @Test public void testUpdatesBackPressureOnSendWhenEnabledAndWithSupportedCallback() throws UnknownHostException { - MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState(); + MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddress.getByName("127.0.0.2")); IAsyncCallback bpCallback = new BackPressureCallback(); IAsyncCallback noCallback = new NoBackPressureCallback(); MessageOut<?> ignored = null; @@ -218,7 +231,7 @@ public class MessagingServiceTest @Test public void testUpdatesBackPressureOnReceiveWhenEnabledAndWithSupportedCallback() throws UnknownHostException { - MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState(); + MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddress.getByName("127.0.0.2")); IAsyncCallback bpCallback = new BackPressureCallback(); IAsyncCallback noCallback = new NoBackPressureCallback(); boolean timeout = false; @@ -242,7 +255,7 @@ public class MessagingServiceTest @Test public void testUpdatesBackPressureOnTimeoutWhenEnabledAndWithSupportedCallback() throws UnknownHostException { - MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState(); + MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddress.getByName("127.0.0.2")); IAsyncCallback bpCallback = new BackPressureCallback(); IAsyncCallback noCallback = new NoBackPressureCallback(); boolean timeout = true; @@ -285,13 +298,7 @@ public class MessagingServiceTest private static void addDCLatency(long sentAt, long nowTime) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos)) - { - out.writeInt((int) sentAt); - } - DataInputStreamPlus in = new DataInputStreamPlus(new ByteArrayInputStream(baos.toByteArray())); - MessageIn.readConstructionTime(InetAddress.getLocalHost(), in, nowTime); + MessageIn.deriveConstructionTime(InetAddress.getLocalHost(), (int)sentAt, nowTime); } public static class MockBackPressureStrategy implements BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState> @@ -414,32 +421,83 @@ public class MessagingServiceTest InetAddress address = InetAddress.getByName("127.0.0.250"); //Should return null - assertNull(ms.getConnectionPool(address)); - assertNull(ms.getConnection(address, new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK))); + MessageOut messageOut = new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK); + assertFalse(ms.isConnected(address, messageOut)); //Should tolerate null ms.convict(address); - ms.sendOneWay(new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK), address); + ms.sendOneWay(messageOut, address); } @Test - public void testOutboundTcpConnectionCleansUp() throws Exception + public void testOutboundMessagingConnectionCleansUp() throws Exception { MessagingService ms = MessagingService.instance(); - DatabaseDescriptor.setInternodeAuthenticator(ALLOW_NOTHING_AUTHENTICATOR); - InetAddress address = InetAddress.getByName("127.0.0.250"); - OutboundTcpConnectionPool pool = new OutboundTcpConnectionPool(address, new MockBackPressureStrategy(null).newState(address)); - ms.connectionManagers.put(address, pool); - pool.smallMessages.start(); - pool.smallMessages.enqueue(new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK), 0); - pool.smallMessages.join(); - assertFalse(ms.connectionManagers.containsKey(address)); + InetSocketAddress local = new InetSocketAddress("127.0.0.1", 9876); + InetSocketAddress remote = new InetSocketAddress("127.0.0.2", 9876); + + OutboundMessagingPool pool = new OutboundMessagingPool(remote, local, null, new MockBackPressureStrategy(null).newState(remote.getAddress()), ALLOW_NOTHING_AUTHENTICATOR); + ms.channelManagers.put(remote.getAddress(), pool); + pool.sendMessage(new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK), 0); + assertFalse(ms.channelManagers.containsKey(remote.getAddress())); } - @After - public void replaceAuthenticator() + @Test + public void reconnectWithNewIp() throws UnknownHostException { - DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator); + InetAddress publicIp = InetAddress.getByName("127.0.0.2"); + InetAddress privateIp = InetAddress.getByName("127.0.0.3"); + + // reset the preferred IP value, for good test hygene + SystemKeyspace.updatePreferredIP(publicIp, publicIp); + + // create pool/conn with public addr + Assert.assertEquals(publicIp, messagingService.getCurrentEndpoint(publicIp)); + messagingService.reconnectWithNewIp(publicIp, privateIp); + Assert.assertEquals(privateIp, messagingService.getCurrentEndpoint(publicIp)); + + messagingService.destroyConnectionPool(publicIp); + + // recreate the pool/conn, and make sure the preferred ip addr is used + Assert.assertEquals(privateIp, messagingService.getCurrentEndpoint(publicIp)); + } + + @Test + public void testCloseInboundConnections() throws UnknownHostException, InterruptedException + { + messagingService.listen(); + Assert.assertTrue(messagingService.isListening()); + Assert.assertTrue(messagingService.serverChannels.size() > 0); + for (ServerChannel serverChannel : messagingService.serverChannels) + Assert.assertEquals(0, serverChannel.size()); + + // now, create a connection and make sure it's in a channel group + InetSocketAddress server = new InetSocketAddress(FBUtilities.getBroadcastAddress(), DatabaseDescriptor.getStoragePort()); + OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 0), server); + + CountDownLatch latch = new CountDownLatch(1); + OutboundConnectionParams params = OutboundConnectionParams.builder() + .mode(NettyFactory.Mode.MESSAGING) + .sendBufferSize(1 << 10) + .connectionId(id) + .callback(handshakeResult -> latch.countDown()) + .protocolVersion(MessagingService.current_version) + .build(); + Bootstrap bootstrap = NettyFactory.instance.createOutboundBootstrap(params); + Channel channel = bootstrap.connect().awaitUninterruptibly().channel(); + Assert.assertNotNull(channel); + latch.await(1, TimeUnit.SECONDS); // allow the netty pipeline/c* handshake to get set up + + int connectCount = 0; + for (ServerChannel serverChannel : messagingService.serverChannels) + connectCount += serverChannel.size(); + Assert.assertTrue(connectCount > 0); + + // last, shutdown the MS and make sure connections are removed + messagingService.shutdown(); + for (ServerChannel serverChannel : messagingService.serverChannels) + Assert.assertEquals(0, serverChannel.size()); } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java b/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java deleted file mode 100644 index e3b6817..0000000 --- a/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.net; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.net.MessagingService.Verb; -import org.junit.BeforeClass; -import org.junit.Test; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * The tests check whether Queue expiration in the OutboundTcpConnection behaves properly for droppable and - * non-droppable messages. - */ -public class OutboundTcpConnectionTest -{ - AtomicInteger messageId = new AtomicInteger(0); - - final static Verb VERB_DROPPABLE = Verb.MUTATION; // Droppable, 2s timeout - final static Verb VERB_NONDROPPABLE = Verb.GOSSIP_DIGEST_ACK; // Not droppable - - final static long NANOS_FOR_TIMEOUT; - - static - { - DatabaseDescriptor.daemonInitialization(); - NANOS_FOR_TIMEOUT = TimeUnit.MILLISECONDS.toNanos(VERB_DROPPABLE.getTimeout()*2); - } - - /** - * Verifies our assumptions whether a Verb can be dropped or not. The tests make use of droppabilty, and - * may produce wrong test results if their droppabilty is changed. - */ - @BeforeClass - public static void assertDroppability() - { - if (!MessagingService.DROPPABLE_VERBS.contains(VERB_DROPPABLE)) - throw new AssertionError("Expected " + VERB_DROPPABLE + " to be droppable"); - if (MessagingService.DROPPABLE_VERBS.contains(VERB_NONDROPPABLE)) - throw new AssertionError("Expected " + VERB_NONDROPPABLE + " not to be droppable"); - } - - /** - * Tests that non-droppable messages are never expired - */ - @Test - public void testNondroppable() throws UnknownHostException - { - OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost(); - long nanoTimeBeforeEnqueue = System.nanoTime(); - - assertFalse("Fresh OutboundTcpConnection contains expired messages", - otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue)); - - fillToPurgeSize(otc, VERB_NONDROPPABLE); - fillToPurgeSize(otc, VERB_NONDROPPABLE); - otc.expireMessages(expirationTimeNanos()); - - assertFalse("OutboundTcpConnection with non-droppable verbs should not expire", - otc.backlogContainsExpiredMessages(expirationTimeNanos())); - } - - /** - * Tests that droppable messages will be dropped after they expire, but not before. - * - * @throws UnknownHostException - */ - @Test - public void testDroppable() throws UnknownHostException - { - OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost(); - long nanoTimeBeforeEnqueue = System.nanoTime(); - - initialFill(otc, VERB_DROPPABLE); - assertFalse("OutboundTcpConnection with droppable verbs should not expire immediately", - otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue)); - - otc.expireMessages(nanoTimeBeforeEnqueue); - assertFalse("OutboundTcpConnection with droppable verbs should not expire with enqueue-time expiration", - otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue)); - - // Lets presume, expiration time have passed => At that time there shall be expired messages in the Queue - long nanoTimeWhenExpired = expirationTimeNanos(); - assertTrue("OutboundTcpConnection with droppable verbs should have expired", - otc.backlogContainsExpiredMessages(nanoTimeWhenExpired)); - - // Using the same timestamp, lets expire them and check whether they have gone - otc.expireMessages(nanoTimeWhenExpired); - assertFalse("OutboundTcpConnection should not have expired entries", - otc.backlogContainsExpiredMessages(nanoTimeWhenExpired)); - - // Actually the previous test can be done in a harder way: As expireMessages() has run, we cannot have - // ANY expired values, thus lets test also against nanoTimeBeforeEnqueue - assertFalse("OutboundTcpConnection should not have any expired entries", - otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue)); - - } - - /** - * Fills the given OutboundTcpConnection with (1 + BACKLOG_PURGE_SIZE), elements. The first - * BACKLOG_PURGE_SIZE elements are non-droppable, the last one is a message with the given Verb and can be - * droppable or non-droppable. - */ - private void initialFill(OutboundTcpConnection otc, Verb verb) - { - assertFalse("Fresh OutboundTcpConnection contains expired messages", - otc.backlogContainsExpiredMessages(System.nanoTime())); - - fillToPurgeSize(otc, VERB_NONDROPPABLE); - MessageOut<?> messageDroppable10s = new MessageOut<>(verb); - otc.enqueue(messageDroppable10s, nextMessageId()); - otc.expireMessages(System.nanoTime()); - } - - /** - * Returns a nano timestamp in the far future, when expiration should have been performed for VERB_DROPPABLE. - * The offset is chosen as 2 times of the expiration time of VERB_DROPPABLE. - * - * @return The future nano timestamp - */ - private long expirationTimeNanos() - { - return System.nanoTime() + NANOS_FOR_TIMEOUT; - } - - private int nextMessageId() - { - return messageId.incrementAndGet(); - } - - /** - * Adds BACKLOG_PURGE_SIZE messages to the queue. Hint: At BACKLOG_PURGE_SIZE expiration starts to work. - * - * @param otc - * The OutboundTcpConnection - * @param verb - * The verb that defines the message type - */ - private void fillToPurgeSize(OutboundTcpConnection otc, Verb verb) - { - for (int i = 0; i < OutboundTcpConnection.BACKLOG_PURGE_SIZE; i++) - { - otc.enqueue(new MessageOut<>(verb), nextMessageId()); - } - } - - private OutboundTcpConnection getOutboundTcpConnectionForLocalhost() throws UnknownHostException - { - InetAddress lo = InetAddress.getByName("127.0.0.1"); - OutboundTcpConnectionPool otcPool = new OutboundTcpConnectionPool(lo, null); - OutboundTcpConnection otc = new OutboundTcpConnection(otcPool, "lo-OutboundTcpConnectionTest"); - return otc; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/test/unit/org/apache/cassandra/net/async/ByteBufDataOutputPlusTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/ByteBufDataOutputPlusTest.java b/test/unit/org/apache/cassandra/net/async/ByteBufDataOutputPlusTest.java new file mode 100644 index 0000000..959c37a --- /dev/null +++ b/test/unit/org/apache/cassandra/net/async/ByteBufDataOutputPlusTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.Memory; +import org.apache.cassandra.io.util.SafeMemory; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; + +public class ByteBufDataOutputPlusTest +{ + private static final String KEYSPACE1 = "NettyPipilineTest"; + private static final String STANDARD1 = "Standard1"; + private static final int columnCount = 128; + + private ByteBuf buf; + + @BeforeClass + public static void before() + { + DatabaseDescriptor.daemonInitialization(); + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, columnCount, AsciiType.instance, BytesType.instance)); + CompactionManager.instance.disableAutoCompaction(); + } + + @After + public void tearDown() + { + if (buf != null) + buf.release(); + } + + @Test + public void compareBufferSizes() throws IOException + { + final int currentFrameSize = getMessage().message.serializedSize(MessagingService.current_version); + + ByteBuffer buffer = ByteBuffer.allocateDirect(currentFrameSize); //bufferedOut.nioBuffer(0, bufferedOut.writableBytes()); + getMessage().message.serialize(new DataOutputBuffer(buffer), MessagingService.current_version); + Assert.assertFalse(buffer.hasRemaining()); + Assert.assertEquals(buffer.capacity(), buffer.position()); + + ByteBuf bbosOut = PooledByteBufAllocator.DEFAULT.ioBuffer(currentFrameSize, currentFrameSize); + try + { + getMessage().message.serialize(new ByteBufDataOutputPlus(bbosOut), MessagingService.current_version); + + Assert.assertFalse(bbosOut.isWritable()); + Assert.assertEquals(bbosOut.capacity(), bbosOut.writerIndex()); + + Assert.assertEquals(buffer.position(), bbosOut.writerIndex()); + for (int i = 0; i < currentFrameSize; i++) + { + Assert.assertEquals(buffer.get(i), bbosOut.getByte(i)); + } + } + finally + { + bbosOut.release(); + } + } + + private QueuedMessage getMessage() + { + ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1); + ByteBuffer buf = ByteBuffer.allocate(1 << 10); + RowUpdateBuilder rowUpdateBuilder = new RowUpdateBuilder(cfs1.metadata.get(), 0, "k") + .clustering("bytes"); + for (int i = 0; i < columnCount; i++) + rowUpdateBuilder.add("val" + i, buf); + + Mutation mutation = rowUpdateBuilder.build(); + return new QueuedMessage(mutation.createMessage(), 42); + } + + @Test + public void compareDOS() throws IOException + { + buf = PooledByteBufAllocator.DEFAULT.ioBuffer(1024, 1024); + ByteBuffer buffer = ByteBuffer.allocateDirect(1024); + + ByteBufDataOutputPlus byteBufDataOutputPlus = new ByteBufDataOutputPlus(buf); + DataOutputBuffer dataOutputBuffer = new DataOutputBuffer(buffer); + + write(byteBufDataOutputPlus); + write(dataOutputBuffer); + + Assert.assertEquals(buffer.position(), buf.writerIndex()); + for (int i = 0; i < buffer.position(); i++) + { + Assert.assertEquals(buffer.get(i), buf.getByte(i)); + } + } + + private void write(DataOutputPlus out) throws IOException + { + ByteBuffer b = ByteBuffer.allocate(8); + b.putLong(29811134237462734L); + out.write(b); + b = ByteBuffer.allocateDirect(8); + b.putDouble(92367.4253647890626); + out.write(b); + + out.writeInt(29319236); + + byte[] array = new byte[17]; + for (int i = 0; i < array.length; i++) + array[i] = (byte)i; + out.write(array, 0 , array.length); + + out.write(42); + out.writeUTF("This is a great string!!"); + out.writeByte(-100); + out.writeUnsignedVInt(3247634L); + out.writeVInt(12313695L); + out.writeBoolean(true); + out.writeShort(4371); + out.writeChar('j'); + out.writeLong(472348263487234L); + out.writeFloat(34534.12623F); + out.writeDouble(0.2384253D); + out.writeBytes("Write my bytes"); + out.writeChars("These are some swell chars"); + + Memory memory = new SafeMemory(8); + memory.setLong(0, -21365123651231L); + out.write(memory, 0, memory.size()); + memory.close(); + } + + @Test (expected = UnsupportedOperationException.class) + public void applyToChannel() throws IOException + { + ByteBufDataOutputPlus out = new ByteBufDataOutputPlus(Unpooled.wrappedBuffer(new byte[0])); + out.applyToChannel(null); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org