http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 41771e7..6caada1 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -17,45 +17,63 @@ */ package org.apache.cassandra.net; -import java.io.*; +import java.io.IOError; +import java.io.IOException; import java.lang.management.ManagementFactory; -import java.net.*; -import java.nio.channels.AsynchronousCloseException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.ServerSocketChannel; -import java.util.*; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.EnumMap; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - import javax.management.MBeanServer; import javax.management.ObjectName; -import javax.net.ssl.SSLHandshakeException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import org.cliffc.high_scale_lib.NonBlockingHashMap; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.carrotsearch.hppc.IntObjectMap; import com.carrotsearch.hppc.IntObjectOpenHashMap; +import io.netty.channel.Channel; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import org.apache.cassandra.auth.IInternodeAuthenticator; +import org.apache.cassandra.batchlog.Batch; import org.apache.cassandra.concurrent.ExecutorLocals; +import org.apache.cassandra.concurrent.LocalAwareExecutorService; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.concurrent.LocalAwareExecutorService; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; -import org.apache.cassandra.db.*; -import org.apache.cassandra.batchlog.Batch; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.CounterMutation; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadResponse; +import org.apache.cassandra.db.SnapshotCommand; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.TruncateResponse; +import org.apache.cassandra.db.Truncation; +import org.apache.cassandra.db.WriteResponse; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.BootStrapper; import org.apache.cassandra.dht.IPartitioner; @@ -70,22 +88,32 @@ import org.apache.cassandra.hints.HintResponse; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.ILatencySubscriber; import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.metrics.ConnectionMetrics; import org.apache.cassandra.metrics.DroppedMessageMetrics; import org.apache.cassandra.metrics.MessagingMetrics; +import org.apache.cassandra.net.async.OutboundMessagingPool; +import org.apache.cassandra.net.async.NettyFactory; +import org.apache.cassandra.net.async.NettyFactory.InboundInitializer; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.schema.MigrationManager; import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.security.SSLFactory; -import org.apache.cassandra.service.*; +import org.apache.cassandra.service.AbstractWriteResponseHandler; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.service.paxos.PrepareResponse; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.BooleanSerializer; +import org.apache.cassandra.utils.ExpiringMap; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.NativeLibrary; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.StatusLogger; +import org.apache.cassandra.utils.UUIDSerializer; import org.apache.cassandra.utils.concurrent.SimpleCondition; public final class MessagingService implements MessagingServiceMBean @@ -323,7 +351,7 @@ public final class MessagingService implements MessagingServiceMBean }}; /** - * Messages we receive in IncomingTcpConnection have a Verb that tells us what kind of message it is. + * Messages we receive from peers have a Verb that tells us what kind of message it is. * Most of the time, this is enough to determine how to deserialize the message payload. * The exception is the REQUEST_RESPONSE verb, which just means "a reply to something you told me to do." * Traditionally, this was fine since each VerbHandler knew what type of payload it expected, and @@ -418,12 +446,12 @@ public final class MessagingService implements MessagingServiceMBean private final Map<Verb, IVerbHandler> verbHandlers; @VisibleForTesting - final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<>(); + public final ConcurrentMap<InetAddress, OutboundMessagingPool> channelManagers = new NonBlockingHashMap<>(); + final List<ServerChannel> serverChannels = Lists.newArrayList(); private static final Logger logger = LoggerFactory.getLogger(MessagingService.class); private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000; - private final List<SocketThread> socketThreads = Lists.newArrayList(); private final SimpleCondition listenGate; /** @@ -533,9 +561,7 @@ public final class MessagingService implements MessagingServiceMBean maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout); ConnectionMetrics.totalTimeouts.mark(); - OutboundTcpConnectionPool cp = getConnectionPool(expiredCallbackInfo.target); - if (cp != null) - cp.incrementTimeout(); + markTimeout(expiredCallbackInfo.target); if (expiredCallbackInfo.callback.supportsBackPressure()) { @@ -606,12 +632,9 @@ public final class MessagingService implements MessagingServiceMBean { if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure()) { - OutboundTcpConnectionPool cp = getConnectionPool(host); - if (cp != null) - { - BackPressureState backPressureState = cp.getBackPressureState(); + BackPressureState backPressureState = getBackPressureState(host); + if (backPressureState != null) backPressureState.onMessageSent(message); - } } } @@ -626,15 +649,13 @@ public final class MessagingService implements MessagingServiceMBean { if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure()) { - OutboundTcpConnectionPool cp = getConnectionPool(host); - if (cp != null) - { - BackPressureState backPressureState = cp.getBackPressureState(); - if (!timeout) - backPressureState.onResponseReceived(); - else - backPressureState.onResponseTimeout(); - } + BackPressureState backPressureState = getBackPressureState(host); + if (backPressureState == null) + return; + if (!timeout) + backPressureState.onResponseReceived(); + else + backPressureState.onResponseTimeout(); } } @@ -656,14 +677,27 @@ public final class MessagingService implements MessagingServiceMBean { if (host.equals(FBUtilities.getBroadcastAddress())) continue; - OutboundTcpConnectionPool cp = getConnectionPool(host); - if (cp != null) - states.add(cp.getBackPressureState()); + OutboundMessagingPool pool = getMessagingConnection(host); + if (pool != null) + states.add(pool.getBackPressureState()); } backPressure.apply(states, timeoutInNanos, TimeUnit.NANOSECONDS); } } + BackPressureState getBackPressureState(InetAddress host) + { + OutboundMessagingPool messagingConnection = getMessagingConnection(host); + return messagingConnection != null ? messagingConnection.getBackPressureState() : null; + } + + void markTimeout(InetAddress addr) + { + OutboundMessagingPool conn = channelManagers.get(addr); + if (conn != null) + conn.incrementTimeout(); + } + /** * Track latency information for the dynamic snitch * @@ -688,30 +722,25 @@ public final class MessagingService implements MessagingServiceMBean */ public void convict(InetAddress ep) { - OutboundTcpConnectionPool cp = getConnectionPool(ep); - if (cp != null) - { - logger.trace("Resetting pool for {}", ep); - cp.reset(); - } - else - { - logger.debug("Not resetting pool for {} because internode authenticator said not to connect", ep); - } + logger.trace("Resetting pool for {}", ep); + reset(ep); } public void listen() { callbacks.reset(); // hack to allow tests to stop/restart MS listen(FBUtilities.getLocalAddress()); - if (DatabaseDescriptor.shouldListenOnBroadcastAddress() - && !FBUtilities.getLocalAddress().equals(FBUtilities.getBroadcastAddress())) - { + if (shouldListenOnBroadcastAddress()) listen(FBUtilities.getBroadcastAddress()); - } listenGate.signalAll(); } + public static boolean shouldListenOnBroadcastAddress() + { + return DatabaseDescriptor.shouldListenOnBroadcastAddress() + && !FBUtilities.getLocalAddress().equals(FBUtilities.getBroadcastAddress()); + } + /** * Listen on the specified port. * @@ -719,80 +748,64 @@ public final class MessagingService implements MessagingServiceMBean */ private void listen(InetAddress localEp) throws ConfigurationException { - for (ServerSocket ss : getServerSockets(localEp)) + IInternodeAuthenticator authenticator = DatabaseDescriptor.getInternodeAuthenticator(); + int receiveBufferSize = DatabaseDescriptor.getInternodeRecvBufferSize(); + + if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.none) { - SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp); - th.start(); - socketThreads.add(th); + InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getSSLStoragePort()); + ChannelGroup channelGroup = new DefaultChannelGroup("EncryptedInternodeMessagingGroup", NettyFactory.executorForChannelGroups()); + InboundInitializer initializer = new InboundInitializer(authenticator, DatabaseDescriptor.getServerEncryptionOptions(), channelGroup); + Channel encryptedChannel = NettyFactory.instance.createInboundChannel(localAddr, initializer, receiveBufferSize); + serverChannels.add(new ServerChannel(encryptedChannel, channelGroup)); } + + if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.all) + { + InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()); + ChannelGroup channelGroup = new DefaultChannelGroup("InternodeMessagingGroup", NettyFactory.executorForChannelGroups()); + InboundInitializer initializer = new InboundInitializer(authenticator, null, channelGroup); + Channel channel = NettyFactory.instance.createInboundChannel(localAddr, initializer, receiveBufferSize); + serverChannels.add(new ServerChannel(channel, channelGroup)); + } + + if (serverChannels.isEmpty()) + throw new IllegalStateException("no listening channels set up in MessagingService!"); } - @SuppressWarnings("resource") - private List<ServerSocket> getServerSockets(InetAddress localEp) throws ConfigurationException + /** + * A simple struct to wrap up the the components needed for each listening socket. + */ + @VisibleForTesting + static class ServerChannel { - final List<ServerSocket> ss = new ArrayList<ServerSocket>(2); - if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.none) + /** + * The base {@link Channel} that is doing the spcket listen/accept. + */ + private final Channel channel; + + /** + * A group of the open, inbound {@link Channel}s connected to this node. This is mostly interesting so that all of + * the inbound connections/channels can be closed when the listening socket itself is being closed. + */ + private final ChannelGroup connectedChannels; + + private ServerChannel(Channel channel, ChannelGroup channelGroup) { - try - { - ss.add(SSLFactory.getServerSocket(DatabaseDescriptor.getServerEncryptionOptions(), localEp, DatabaseDescriptor.getSSLStoragePort())); - } - catch (IOException e) - { - throw new ConfigurationException("Unable to create ssl socket", e); - } - // setReuseAddress happens in the factory. - logger.info("Starting Encrypted Messaging Service on SSL port {}", DatabaseDescriptor.getSSLStoragePort()); + this.channel = channel; + this.connectedChannels = channelGroup; } - if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.all) + void close() { - ServerSocketChannel serverChannel = null; - try - { - serverChannel = ServerSocketChannel.open(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - ServerSocket socket = serverChannel.socket(); - try - { - socket.setReuseAddress(true); - } - catch (SocketException e) - { - FileUtils.closeQuietly(socket); - throw new ConfigurationException("Insufficient permissions to setReuseAddress", e); - } - InetSocketAddress address = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()); - try - { - socket.bind(address,500); - } - catch (BindException e) - { - FileUtils.closeQuietly(socket); - if (e.getMessage().contains("in use")) - throw new ConfigurationException(address + " is in use by another process. Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services"); - else if (e.getMessage().contains("Cannot assign requested address")) - throw new ConfigurationException("Unable to bind to address " + address - + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2"); - else - throw new RuntimeException(e); - } - catch (IOException e) - { - FileUtils.closeQuietly(socket); - throw new RuntimeException(e); - } - String nic = FBUtilities.getNetworkInterface(localEp); - logger.info("Starting Messaging Service on {}:{}{}", localEp, DatabaseDescriptor.getStoragePort(), - nic == null? "" : String.format(" (%s)", nic)); - ss.add(socket); + channel.close().syncUninterruptibly(); + connectedChannels.close().syncUninterruptibly(); + } + int size() + + { + return connectedChannels.size(); } - return ss; } public void waitUntilListening() @@ -812,53 +825,42 @@ public final class MessagingService implements MessagingServiceMBean return listenGate.isSignaled(); } + public void destroyConnectionPool(InetAddress to) { - OutboundTcpConnectionPool cp = connectionManagers.get(to); - if (cp == null) - return; - cp.close(); - connectionManagers.remove(to); + OutboundMessagingPool pool = channelManagers.remove(to); + if (pool != null) + pool.close(true); } /** - * Get a connection pool to the specified endpoint. Constructs one if none exists. + * Reconnect to the peer using the given {@code addr}. Outstanding messages in each channel will be sent on the + * current channel. Typically this function is used for something like EC2 public IP addresses which need to be used + * for communication between EC2 regions. * - * Can return null if the InternodeAuthenticator fails to authenticate the node. - * @param to - * @return The connection pool or null if internode authenticator says not to + * @param address IP Address to identify the peer + * @param preferredAddress IP Address to use (and prefer) going forward for connecting to the peer */ - public OutboundTcpConnectionPool getConnectionPool(InetAddress to) + public void reconnectWithNewIp(InetAddress address, InetAddress preferredAddress) { - OutboundTcpConnectionPool cp = connectionManagers.get(to); - if (cp == null) - { - //Don't attempt to connect to nodes that won't (or shouldn't) authenticate anyways - if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(to, OutboundTcpConnectionPool.portFor(to))) - return null; + SystemKeyspace.updatePreferredIP(address, preferredAddress); - cp = new OutboundTcpConnectionPool(to, backPressure.newState(to)); - OutboundTcpConnectionPool existingPool = connectionManagers.putIfAbsent(to, cp); - if (existingPool != null) - cp = existingPool; - else - cp.start(); - } - cp.waitForStarted(); - return cp; + OutboundMessagingPool messagingPool = channelManagers.get(address); + if (messagingPool != null) + messagingPool.reconnectWithNewIp(new InetSocketAddress(preferredAddress, portFor(address))); } - /** - * Get a connection for a message to a specific endpoint. Constructs one if none exists. - * - * Can return null if the InternodeAuthenticator fails to authenticate the node. - * @param to - * @return The connection or null if internode authenticator says not to - */ - public OutboundTcpConnection getConnection(InetAddress to, MessageOut msg) + private void reset(InetAddress address) { - OutboundTcpConnectionPool cp = getConnectionPool(to); - return cp == null ? null : cp.getConnection(msg); + OutboundMessagingPool messagingPool = channelManagers.remove(address); + if (messagingPool != null) + messagingPool.close(false); + } + + public InetAddress getCurrentEndpoint(InetAddress publicAddress) + { + OutboundMessagingPool messagingPool = getMessagingConnection(publicAddress); + return messagingPool != null ? messagingPool.getPreferredRemoteAddr().getAddress() : null; } /** @@ -1008,12 +1010,9 @@ public final class MessagingService implements MessagingServiceMBean if (!ms.allowOutgoingMessage(message, id, to)) return; - // get pooled connection (really, connection queue) - OutboundTcpConnection connection = getConnection(to, message); - - // write it - if (connection != null) - connection.enqueue(message, id); + OutboundMessagingPool outboundMessagingPool = getMessagingConnection(to); + if (outboundMessagingPool != null) + outboundMessagingPool.sendMessage(message, id); } public <T> AsyncOneResponse<T> sendRR(MessageOut message, InetAddress to) @@ -1049,18 +1048,17 @@ public final class MessagingService implements MessagingServiceMBean // attempt to humor tests that try to stop and restart MS try { - for (SocketThread th : socketThreads) - try - { - th.close(); - } - catch (IOException e) - { - // see https://issues.apache.org/jira/browse/CASSANDRA-10545 - handleIOExceptionOnClose(e); - } + // first close the recieve channels + for (ServerChannel serverChannel : serverChannels) + serverChannel.close(); + + // now close the send channels + for (OutboundMessagingPool pool : channelManagers.values()) + pool.close(false); + + NettyFactory.instance.close(); } - catch (IOException e) + catch (Exception e) { throw new IOError(e); } @@ -1281,109 +1279,13 @@ public final class MessagingService implements MessagingServiceMBean return ret; } - @VisibleForTesting - public static class SocketThread extends Thread - { - private final ServerSocket server; - @VisibleForTesting - public final Set<Closeable> connections = Sets.newConcurrentHashSet(); - - SocketThread(ServerSocket server, String name) - { - super(name); - this.server = server; - } - - @SuppressWarnings("resource") - public void run() - { - while (!server.isClosed()) - { - Socket socket = null; - try - { - socket = server.accept(); - if (!authenticate(socket)) - { - logger.trace("remote failed to authenticate"); - socket.close(); - continue; - } - - socket.setKeepAlive(true); - socket.setSoTimeout(2 * OutboundTcpConnection.WAIT_FOR_VERSION_MAX_TIME); - // determine the connection type to decide whether to buffer - DataInputStream in = new DataInputStream(socket.getInputStream()); - MessagingService.validateMagic(in.readInt()); - int header = in.readInt(); - boolean isStream = MessagingService.getBits(header, 3, 1) == 1; - int version = MessagingService.getBits(header, 15, 8); - logger.trace("Connection version {} from {}", version, socket.getInetAddress()); - socket.setSoTimeout(0); - - Thread thread = isStream - ? new IncomingStreamingConnection(version, socket, connections) - : new IncomingTcpConnection(version, MessagingService.getBits(header, 2, 1) == 1, socket, connections); - thread.start(); - connections.add((Closeable) thread); - } - catch (AsynchronousCloseException e) - { - // this happens when another thread calls close(). - logger.trace("Asynchronous close seen by server thread"); - break; - } - catch (ClosedChannelException e) - { - logger.trace("MessagingService server thread already closed"); - break; - } - catch (SSLHandshakeException e) - { - logger.error("SSL handshake error for inbound connection from " + socket, e); - FileUtils.closeQuietly(socket); - } - catch (Throwable t) - { - logger.trace("Error reading the socket {}", socket, t); - FileUtils.closeQuietly(socket); - } - } - logger.info("MessagingService has terminated the accept() thread"); - } - - void close() throws IOException - { - logger.trace("Closing accept() thread"); - - try - { - server.close(); - } - catch (IOException e) - { - // see https://issues.apache.org/jira/browse/CASSANDRA-8220 - // see https://issues.apache.org/jira/browse/CASSANDRA-12513 - handleIOExceptionOnClose(e); - } - for (Closeable connection : connections) - { - connection.close(); - } - } - - private boolean authenticate(Socket socket) - { - return DatabaseDescriptor.getInternodeAuthenticator().authenticate(socket.getInetAddress(), socket.getPort()); - } - } private static void handleIOExceptionOnClose(IOException e) throws IOException { // dirty hack for clean shutdown on OSX w/ Java >= 1.8.0_20 // see https://bugs.openjdk.java.net/browse/JDK-8050499; // also CASSANDRA-12513 - if ("Mac OS X".equals(System.getProperty("os.name"))) + if (NativeLibrary.osType == NativeLibrary.OSType.MAC) { switch (e.getMessage()) { @@ -1398,79 +1300,73 @@ public final class MessagingService implements MessagingServiceMBean public Map<String, Integer> getLargeMessagePendingTasks() { - Map<String, Integer> pendingTasks = new HashMap<String, Integer>(connectionManagers.size()); - for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) - pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().largeMessages.getPendingMessages()); + Map<String, Integer> pendingTasks = new HashMap<String, Integer>(channelManagers.size()); + for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) + pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().largeMessageChannel.getPendingMessages()); return pendingTasks; } - public int getLargeMessagePendingTasks(InetAddress address) - { - OutboundTcpConnectionPool connection = connectionManagers.get(address); - return connection == null ? 0 : connection.largeMessages.getPendingMessages(); - } - public Map<String, Long> getLargeMessageCompletedTasks() { - Map<String, Long> completedTasks = new HashMap<String, Long>(connectionManagers.size()); - for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) - completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().largeMessages.getCompletedMesssages()); + Map<String, Long> completedTasks = new HashMap<String, Long>(channelManagers.size()); + for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) + completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().largeMessageChannel.getCompletedMessages()); return completedTasks; } public Map<String, Long> getLargeMessageDroppedTasks() { - Map<String, Long> droppedTasks = new HashMap<String, Long>(connectionManagers.size()); - for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) - droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().largeMessages.getDroppedMessages()); + Map<String, Long> droppedTasks = new HashMap<String, Long>(channelManagers.size()); + for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) + droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().largeMessageChannel.getDroppedMessages()); return droppedTasks; } public Map<String, Integer> getSmallMessagePendingTasks() { - Map<String, Integer> pendingTasks = new HashMap<String, Integer>(connectionManagers.size()); - for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) - pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().smallMessages.getPendingMessages()); + Map<String, Integer> pendingTasks = new HashMap<String, Integer>(channelManagers.size()); + for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) + pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().smallMessageChannel.getPendingMessages()); return pendingTasks; } public Map<String, Long> getSmallMessageCompletedTasks() { - Map<String, Long> completedTasks = new HashMap<String, Long>(connectionManagers.size()); - for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) - completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().smallMessages.getCompletedMesssages()); + Map<String, Long> completedTasks = new HashMap<String, Long>(channelManagers.size()); + for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) + completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().smallMessageChannel.getCompletedMessages()); return completedTasks; } public Map<String, Long> getSmallMessageDroppedTasks() { - Map<String, Long> droppedTasks = new HashMap<String, Long>(connectionManagers.size()); - for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) - droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().smallMessages.getDroppedMessages()); + Map<String, Long> droppedTasks = new HashMap<String, Long>(channelManagers.size()); + for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) + droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().smallMessageChannel.getDroppedMessages()); return droppedTasks; } public Map<String, Integer> getGossipMessagePendingTasks() { - Map<String, Integer> pendingTasks = new HashMap<String, Integer>(connectionManagers.size()); - for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) - pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipMessages.getPendingMessages()); + Map<String, Integer> pendingTasks = new HashMap<String, Integer>(channelManagers.size()); + for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) + pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipChannel.getPendingMessages()); return pendingTasks; } public Map<String, Long> getGossipMessageCompletedTasks() { - Map<String, Long> completedTasks = new HashMap<String, Long>(connectionManagers.size()); - for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) - completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipMessages.getCompletedMesssages()); + Map<String, Long> completedTasks = new HashMap<String, Long>(channelManagers.size()); + for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) + completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipChannel.getCompletedMessages()); return completedTasks; } public Map<String, Long> getGossipMessageDroppedTasks() { - Map<String, Long> droppedTasks = new HashMap<String, Long>(connectionManagers.size()); - for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) - droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipMessages.getDroppedMessages()); + Map<String, Long> droppedTasks = new HashMap<String, Long>(channelManagers.size()); + for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) + droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipChannel.getDroppedMessages()); return droppedTasks; } @@ -1490,8 +1386,8 @@ public final class MessagingService implements MessagingServiceMBean public Map<String, Long> getTimeoutsPerHost() { - Map<String, Long> result = new HashMap<String, Long>(connectionManagers.size()); - for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry: connectionManagers.entrySet()) + Map<String, Long> result = new HashMap<String, Long>(channelManagers.size()); + for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) { String ip = entry.getKey().getHostAddress(); long recent = entry.getValue().getTimeouts(); @@ -1502,8 +1398,8 @@ public final class MessagingService implements MessagingServiceMBean public Map<String, Double> getBackPressurePerHost() { - Map<String, Double> map = new HashMap<>(connectionManagers.size()); - for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) + Map<String, Double> map = new HashMap<>(channelManagers.size()); + for (Map.Entry<InetAddress, OutboundMessagingPool> entry : channelManagers.entrySet()) map.put(entry.getKey().getHostAddress(), entry.getValue().getBackPressureState().getBackPressureRateLimit()); return map; @@ -1540,9 +1436,72 @@ public final class MessagingService implements MessagingServiceMBean bounds.left.getPartitioner().getClass().getName())); } + private OutboundMessagingPool getMessagingConnection(InetAddress to) + { + OutboundMessagingPool pool = channelManagers.get(to); + if (pool == null) + { + final boolean secure = isEncryptedConnection(to); + final int port = portFor(secure); + if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(to, port)) + return null; + + InetSocketAddress preferredRemote = new InetSocketAddress(SystemKeyspace.getPreferredIP(to), port); + InetSocketAddress local = new InetSocketAddress(FBUtilities.getLocalAddress(), 0); + ServerEncryptionOptions encryptionOptions = secure ? DatabaseDescriptor.getServerEncryptionOptions() : null; + IInternodeAuthenticator authenticator = DatabaseDescriptor.getInternodeAuthenticator(); + + pool = new OutboundMessagingPool(preferredRemote, local, encryptionOptions, backPressure.newState(to), authenticator); + OutboundMessagingPool existing = channelManagers.putIfAbsent(to, pool); + if (existing != null) + { + pool.close(false); + pool = existing; + } + } + return pool; + } + + public static int portFor(InetAddress addr) + { + final boolean secure = isEncryptedConnection(addr); + return portFor(secure); + } + + private static int portFor(boolean secure) + { + return secure ? DatabaseDescriptor.getSSLStoragePort() : DatabaseDescriptor.getStoragePort(); + } + @VisibleForTesting - public List<SocketThread> getSocketThreads() + boolean isConnected(InetAddress address, MessageOut messageOut) { - return socketThreads; + OutboundMessagingPool pool = channelManagers.get(address); + if (pool == null) + return false; + return pool.getConnection(messageOut).isConnected(); + } + + public static boolean isEncryptedConnection(InetAddress address) + { + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + switch (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption) + { + case none: + return false; // if nothing needs to be encrypted then return immediately. + case all: + break; + case dc: + if (snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress()))) + return false; + break; + case rack: + // for rack then check if the DC's are the same. + if (snitch.getRack(address).equals(snitch.getRack(FBUtilities.getBroadcastAddress())) + && snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress()))) + return false; + break; + } + return true; } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java deleted file mode 100644 index 42abbe6..0000000 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ /dev/null @@ -1,693 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.net; - -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.IOException; -import java.net.InetAddress; -import java.net.Socket; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; -import java.util.*; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.zip.Checksum; - -import javax.net.ssl.SSLHandshakeException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.netty.util.concurrent.FastThreadLocalThread; -import net.jpountz.lz4.LZ4BlockOutputStream; -import net.jpountz.lz4.LZ4Compressor; -import net.jpountz.lz4.LZ4Factory; -import net.jpountz.xxhash.XXHashFactory; - -import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.io.util.DataOutputStreamPlus; -import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; -import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; -import org.apache.cassandra.tracing.TraceState; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.CoalescingStrategies; -import org.apache.cassandra.utils.CoalescingStrategies.Coalescable; -import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis; -import org.apache.cassandra.utils.UUIDGen; -import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.DatabaseDescriptor; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.Uninterruptibles; - -public class OutboundTcpConnection extends FastThreadLocalThread -{ - private static final Logger logger = LoggerFactory.getLogger(OutboundTcpConnection.class); - - private static final String PREFIX = Config.PROPERTY_PREFIX; - - /* - * Enabled/disable TCP_NODELAY for intradc connections. Defaults to enabled. - */ - private static final String INTRADC_TCP_NODELAY_PROPERTY = PREFIX + "otc_intradc_tcp_nodelay"; - private static final boolean INTRADC_TCP_NODELAY = Boolean.parseBoolean(System.getProperty(INTRADC_TCP_NODELAY_PROPERTY, "true")); - - /* - * Size of buffer in output stream - */ - private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size"; - private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64); - - //Size of 3 elements added to every message - private static final int PROTOCOL_MAGIC_ID_TIMESTAMP_SIZE = 12; - - public static final int MAX_COALESCED_MESSAGES = 128; - - private static CoalescingStrategy newCoalescingStrategy(String displayName) - { - return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(), - DatabaseDescriptor.getOtcCoalescingWindow(), - logger, - displayName); - } - - static - { - String strategy = DatabaseDescriptor.getOtcCoalescingStrategy(); - switch (strategy) - { - case "TIMEHORIZON": - break; - case "MOVINGAVERAGE": - case "FIXED": - case "DISABLED": - logger.info("OutboundTcpConnection using coalescing strategy {}", strategy); - break; - default: - //Check that it can be loaded - newCoalescingStrategy("dummy"); - } - - int coalescingWindow = DatabaseDescriptor.getOtcCoalescingWindow(); - if (coalescingWindow != Config.otc_coalescing_window_us_default) - logger.info("OutboundTcpConnection coalescing window set to {}μs", coalescingWindow); - - if (coalescingWindow < 0) - throw new ExceptionInInitializerError( - "Value provided for coalescing window must be greater than 0: " + coalescingWindow); - - int otc_backlog_expiration_interval_in_ms = DatabaseDescriptor.getOtcBacklogExpirationInterval(); - if (otc_backlog_expiration_interval_in_ms != Config.otc_backlog_expiration_interval_ms_default) - logger.info("OutboundTcpConnection backlog expiration interval set to to {}ms", otc_backlog_expiration_interval_in_ms); - } - - private static final MessageOut<?> CLOSE_SENTINEL = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE); - private volatile boolean isStopped = false; - - private static final int OPEN_RETRY_DELAY = 100; // ms between retries - public static final int WAIT_FOR_VERSION_MAX_TIME = 5000; - private static final int NO_VERSION = Integer.MIN_VALUE; - - static final int LZ4_HASH_SEED = 0x9747b28c; - - private final BlockingQueue<QueuedMessage> backlog = new LinkedBlockingQueue<>(); - private static final String BACKLOG_PURGE_SIZE_PROPERTY = PREFIX + "otc_backlog_purge_size"; - @VisibleForTesting - static final int BACKLOG_PURGE_SIZE = Integer.getInteger(BACKLOG_PURGE_SIZE_PROPERTY, 1024); - private final AtomicBoolean backlogExpirationActive = new AtomicBoolean(false); - private volatile long backlogNextExpirationTime; - - private final OutboundTcpConnectionPool poolReference; - - private final CoalescingStrategy cs; - private DataOutputStreamPlus out; - private Socket socket; - private volatile long completed; - private final AtomicLong dropped = new AtomicLong(); - private volatile int currentMsgBufferCount = 0; - private volatile int targetVersion; - - public OutboundTcpConnection(OutboundTcpConnectionPool pool, String name) - { - super("MessagingService-Outgoing-" + pool.endPoint() + "-" + name); - this.poolReference = pool; - cs = newCoalescingStrategy(pool.endPoint().getHostAddress()); - - // We want to use the most precise version we know because while there is version detection on connect(), - // the target version might be accessed by the pool (in getConnection()) before we actually connect (as we - // connect when the first message is submitted). Note however that the only case where we'll connect - // without knowing the true version of a node is if that node is a seed (otherwise, we can't know a node - // unless it has been gossiped to us or it has connected to us and in both case this sets the version) and - // in that case we won't rely on that targetVersion before we're actually connected and so the version - // detection in connect() will do its job. - targetVersion = MessagingService.instance().getVersion(pool.endPoint()); - } - - private static boolean isLocalDC(InetAddress targetHost) - { - String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost); - String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); - return remoteDC.equals(localDC); - } - - public void enqueue(MessageOut<?> message, int id) - { - long nanoTime = System.nanoTime(); - expireMessages(nanoTime); - try - { - backlog.put(new QueuedMessage(message, id, nanoTime)); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - } - - /** - * This is a helper method for unit testing. Disclaimer: Do not use this method outside unit tests, as - * this method is iterating the queue which can be an expensive operation (CPU time, queue locking). - * - * @return true, if the queue contains at least one expired element - */ - @VisibleForTesting // (otherwise = VisibleForTesting.NONE) - boolean backlogContainsExpiredMessages(long nowNanos) - { - return backlog.stream().anyMatch(entry -> entry.isTimedOut(nowNanos)); - } - - void closeSocket(boolean destroyThread) - { - logger.debug("Enqueuing socket close for {}", poolReference.endPoint()); - isStopped = destroyThread; // Exit loop to stop the thread - backlog.clear(); - // in the "destroyThread = true" case, enqueuing the sentinel is important mostly to unblock the backlog.take() - // (via the CoalescingStrategy) in case there's a data race between this method enqueuing the sentinel - // and run() clearing the backlog on connection failure. - enqueue(CLOSE_SENTINEL, -1); - } - - void softCloseSocket() - { - enqueue(CLOSE_SENTINEL, -1); - } - - public int getTargetVersion() - { - return targetVersion; - } - - public void run() - { - final int drainedMessageSize = MAX_COALESCED_MESSAGES; - // keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize) - final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize); - - outer: - while (!isStopped) - { - try - { - cs.coalesce(backlog, drainedMessages, drainedMessageSize); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - - int count = currentMsgBufferCount = drainedMessages.size(); - - //The timestamp of the first message has already been provided to the coalescing strategy - //so skip logging it. - inner: - for (QueuedMessage qm : drainedMessages) - { - try - { - MessageOut<?> m = qm.message; - if (m == CLOSE_SENTINEL) - { - disconnect(); - if (isStopped) - break outer; - continue; - } - - if (qm.isTimedOut(System.nanoTime())) - dropped.incrementAndGet(); - else if (socket != null || connect()) - writeConnected(qm, count == 1 && backlog.isEmpty()); - else - { - // Not connected! Clear out the queue, else gossip messages back up. Update dropped - // statistics accordingly. Hint: The statistics may be slightly too low, if messages - // are added between the calls of backlog.size() and backlog.clear() - dropped.addAndGet(backlog.size()); - backlog.clear(); - break inner; - } - } - catch (InternodeAuthFailed e) - { - logger.warn("Internode auth failed connecting to {}", poolReference.endPoint()); - //Remove the connection pool and other thread so messages aren't queued - MessagingService.instance().destroyConnectionPool(poolReference.endPoint()); - } - catch (Exception e) - { - JVMStabilityInspector.inspectThrowable(e); - // really shouldn't get here, as exception handling in writeConnected() is reasonably robust - // but we want to catch anything bad we don't drop the messages in the current batch - logger.error("error processing a message intended for {}", poolReference.endPoint(), e); - } - currentMsgBufferCount = --count; - } - // Update dropped statistics by the number of unprocessed drainedMessages - dropped.addAndGet(currentMsgBufferCount); - drainedMessages.clear(); - } - } - - public int getPendingMessages() - { - return backlog.size() + currentMsgBufferCount; - } - - public long getCompletedMesssages() - { - return completed; - } - - public long getDroppedMessages() - { - return dropped.get(); - } - - private boolean shouldCompressConnection() - { - // assumes version >= 1.2 - return DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.all - || (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc && !isLocalDC(poolReference.endPoint())); - } - - private void writeConnected(QueuedMessage qm, boolean flush) - { - try - { - byte[] sessionBytes = qm.message.parameters.get(Tracing.TRACE_HEADER); - if (sessionBytes != null) - { - UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)); - TraceState state = Tracing.instance.get(sessionId); - String message = String.format("Sending %s message to %s message size %d bytes", qm.message.verb, - poolReference.endPoint(), - qm.message.serializedSize(targetVersion) + PROTOCOL_MAGIC_ID_TIMESTAMP_SIZE); - // session may have already finished; see CASSANDRA-5668 - if (state == null) - { - byte[] traceTypeBytes = qm.message.parameters.get(Tracing.TRACE_TYPE); - Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]); - Tracing.instance.trace(ByteBuffer.wrap(sessionBytes), message, traceType.getTTL()); - } - else - { - state.trace(message); - if (qm.message.verb == MessagingService.Verb.REQUEST_RESPONSE) - Tracing.instance.doneWithNonLocalSession(state); - } - } - - long timestampMillis = NanoTimeToCurrentTimeMillis.convert(qm.timestampNanos); - writeInternal(qm.message, qm.id, timestampMillis); - - completed++; - if (flush) - out.flush(); - } - catch (Throwable e) - { - JVMStabilityInspector.inspectThrowable(e); - disconnect(); - if (e instanceof IOException || e.getCause() instanceof IOException) - { - logger.debug("Error writing to {}", poolReference.endPoint(), e); - - // If we haven't retried this message yet, put it back on the queue to retry after re-connecting. - // See CASSANDRA-5393 and CASSANDRA-12192. - if (qm.shouldRetry()) - { - try - { - backlog.put(new RetriedQueuedMessage(qm)); - } - catch (InterruptedException e1) - { - throw new AssertionError(e1); - } - } - } - else - { - // Non IO exceptions are likely a programming error so let's not silence them - logger.error("error writing to {}", poolReference.endPoint(), e); - } - } - } - - private void writeInternal(MessageOut<?> message, int id, long timestamp) throws IOException - { - //If you add/remove fields before the message don't forget to update PROTOCOL_MAGIC_ID_TIMESTAMP_SIZE - out.writeInt(MessagingService.PROTOCOL_MAGIC); - out.writeInt(id); - - // int cast cuts off the high-order half of the timestamp, which we can assume remains - // the same between now and when the recipient reconstructs it. - out.writeInt((int) timestamp); - message.serialize(out, targetVersion); - } - - private static void writeHeader(DataOutput out, int version, boolean compressionEnabled) throws IOException - { - // 2 bits: unused. used to be "serializer type," which was always Binary - // 1 bit: compression - // 1 bit: streaming mode - // 3 bits: unused - // 8 bits: version - // 15 bits: unused - int header = 0; - if (compressionEnabled) - header |= 4; - header |= (version << 8); - out.writeInt(header); - } - - private void disconnect() - { - if (socket != null) - { - try - { - socket.close(); - logger.debug("Socket to {} closed", poolReference.endPoint()); - } - catch (IOException e) - { - logger.debug("Exception closing connection to {}", poolReference.endPoint(), e); - } - out = null; - socket = null; - } - } - - @SuppressWarnings("resource") - private boolean connect() throws InternodeAuthFailed - { - InetAddress endpoint = poolReference.endPoint(); - if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(endpoint, poolReference.portFor(endpoint))) - { - throw new InternodeAuthFailed(); - } - - logger.debug("Attempting to connect to {}", endpoint); - - - long start = System.nanoTime(); - long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout()); - while (System.nanoTime() - start < timeout) - { - targetVersion = MessagingService.instance().getVersion(endpoint); - try - { - socket = poolReference.newSocket(); - socket.setKeepAlive(true); - if (isLocalDC(endpoint)) - { - socket.setTcpNoDelay(INTRADC_TCP_NODELAY); - } - else - { - socket.setTcpNoDelay(DatabaseDescriptor.getInterDCTcpNoDelay()); - } - if (DatabaseDescriptor.getInternodeSendBufferSize() > 0) - { - try - { - socket.setSendBufferSize(DatabaseDescriptor.getInternodeSendBufferSize()); - } - catch (SocketException se) - { - logger.warn("Failed to set send buffer size on internode socket.", se); - } - } - - // SocketChannel may be null when using SSL - WritableByteChannel ch = socket.getChannel(); - out = new BufferedDataOutputStreamPlus(ch != null ? ch : Channels.newChannel(socket.getOutputStream()), BUFFER_SIZE); - - out.writeInt(MessagingService.PROTOCOL_MAGIC); - writeHeader(out, targetVersion, shouldCompressConnection()); - out.flush(); - - DataInputStream in = new DataInputStream(socket.getInputStream()); - int maxTargetVersion = handshakeVersion(in); - if (maxTargetVersion == NO_VERSION) - { - // no version is returned, so disconnect an try again - logger.trace("Target max version is {}; no version information yet, will retry", maxTargetVersion); - disconnect(); - continue; - } - else - { - MessagingService.instance().setVersion(endpoint, maxTargetVersion); - } - - if (targetVersion > maxTargetVersion) - { - logger.trace("Target max version is {}; will reconnect with that version", maxTargetVersion); - try - { - if (DatabaseDescriptor.getSeeds().contains(endpoint)) - logger.warn("Seed gossip version is {}; will not connect with that version", maxTargetVersion); - } - catch (Throwable e) - { - // If invalid yaml has been added to the config since startup, getSeeds() will throw an AssertionError - // Additionally, third party seed providers may throw exceptions if network is flakey - // Regardless of what's thrown, we must catch it, disconnect, and try again - JVMStabilityInspector.inspectThrowable(e); - logger.warn("Configuration error prevented outbound connection: {}", e.getLocalizedMessage()); - } - finally - { - disconnect(); - return false; - } - } - - if (targetVersion < maxTargetVersion && targetVersion < MessagingService.current_version) - { - logger.trace("Detected higher max version {} (using {}); will reconnect when queued messages are done", - maxTargetVersion, targetVersion); - softCloseSocket(); - } - - out.writeInt(MessagingService.current_version); - CompactEndpointSerializationHelper.serialize(FBUtilities.getBroadcastAddress(), out); - if (shouldCompressConnection()) - { - out.flush(); - logger.trace("Upgrading OutputStream to {} to be compressed", endpoint); - - // TODO: custom LZ4 OS that supports BB write methods - LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor(); - Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum(); - out = new WrappedDataOutputStreamPlus(new LZ4BlockOutputStream(socket.getOutputStream(), - 1 << 14, // 16k block size - compressor, - checksum, - true)); // no async flushing - } - logger.debug("Done connecting to {}", endpoint); - return true; - } - catch (SSLHandshakeException e) - { - logger.error("SSL handshake error for outbound connection to " + socket, e); - socket = null; - // SSL errors won't be recoverable within timeout period so we'll just abort - return false; - } - catch (IOException e) - { - socket = null; - logger.debug("Unable to connect to {}", endpoint, e); - Uninterruptibles.sleepUninterruptibly(OPEN_RETRY_DELAY, TimeUnit.MILLISECONDS); - } - } - return false; - } - - private int handshakeVersion(final DataInputStream inputStream) - { - final AtomicInteger version = new AtomicInteger(NO_VERSION); - final CountDownLatch versionLatch = new CountDownLatch(1); - NamedThreadFactory.createThread(() -> - { - try - { - logger.info("Handshaking version with {}", poolReference.endPoint()); - version.set(inputStream.readInt()); - } - catch (IOException ex) - { - final String msg = "Cannot handshake version with " + poolReference.endPoint(); - if (logger.isTraceEnabled()) - logger.trace(msg, ex); - else - logger.info(msg); - } - finally - { - //unblock the waiting thread on either success or fail - versionLatch.countDown(); - } - }, "HANDSHAKE-" + poolReference.endPoint()).start(); - - try - { - versionLatch.await(WAIT_FOR_VERSION_MAX_TIME, TimeUnit.MILLISECONDS); - } - catch (InterruptedException ex) - { - throw new AssertionError(ex); - } - return version.get(); - } - - /** - * Expire elements from the queue if the queue is pretty full and expiration is not already in progress. - * This method will only remove droppable expired entries. If no such element exists, nothing is removed from the queue. - * - * @param timestampNanos The current time as from System.nanoTime() - */ - @VisibleForTesting - void expireMessages(long timestampNanos) - { - if (backlog.size() <= BACKLOG_PURGE_SIZE) - return; // Plenty of space - - if (backlogNextExpirationTime - timestampNanos > 0) - return; // Expiration is not due. - - /** - * Expiration is an expensive process. Iterating the queue locks the queue for both writes and - * reads during iter.next() and iter.remove(). Thus letting only a single Thread do expiration. - */ - if (backlogExpirationActive.compareAndSet(false, true)) - { - try - { - Iterator<QueuedMessage> iter = backlog.iterator(); - while (iter.hasNext()) - { - QueuedMessage qm = iter.next(); - if (!qm.droppable) - continue; - if (!qm.isTimedOut(timestampNanos)) - continue; - iter.remove(); - dropped.incrementAndGet(); - } - - if (logger.isTraceEnabled()) - { - long duration = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - timestampNanos); - logger.trace("Expiration of {} took {}μs", getName(), duration); - } - } - finally - { - long backlogExpirationIntervalNanos = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getOtcBacklogExpirationInterval()); - backlogNextExpirationTime = timestampNanos + backlogExpirationIntervalNanos; - backlogExpirationActive.set(false); - } - } - } - - /** messages that have not been retried yet */ - private static class QueuedMessage implements Coalescable - { - final MessageOut<?> message; - final int id; - final long timestampNanos; - final boolean droppable; - - QueuedMessage(MessageOut<?> message, int id, long timestampNanos) - { - this.message = message; - this.id = id; - this.timestampNanos = timestampNanos; - this.droppable = MessagingService.DROPPABLE_VERBS.contains(message.verb); - } - - /** don't drop a non-droppable message just because it's timestamp is expired */ - boolean isTimedOut(long nowNanos) - { - long messageTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(message.getTimeout()); - return droppable && nowNanos - timestampNanos > messageTimeoutNanos; - } - - boolean shouldRetry() - { - // retry all messages once - return true; - } - - public long timestampNanos() - { - return timestampNanos; - } - } - - private static class RetriedQueuedMessage extends QueuedMessage - { - RetriedQueuedMessage(QueuedMessage msg) - { - super(msg.message, msg.id, msg.timestampNanos); - } - - boolean shouldRetry() - { - return false; - } - } - - private static class InternodeAuthFailed extends Exception {} -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java deleted file mode 100644 index 20a8da6..0000000 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.net; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.channels.SocketChannel; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.locator.IEndpointSnitch; -import org.apache.cassandra.metrics.ConnectionMetrics; -import org.apache.cassandra.security.SSLFactory; -import org.apache.cassandra.utils.FBUtilities; - -public class OutboundTcpConnectionPool -{ - public static final long LARGE_MESSAGE_THRESHOLD = - Long.getLong(Config.PROPERTY_PREFIX + "otcp_large_message_threshold", 1024 * 64); - - // pointer for the real Address. - private final InetAddress id; - private final CountDownLatch started; - public final OutboundTcpConnection smallMessages; - public final OutboundTcpConnection largeMessages; - public final OutboundTcpConnection gossipMessages; - - // pointer to the reset Address. - private InetAddress resetEndpoint; - private ConnectionMetrics metrics; - - // back-pressure state linked to this connection: - private final BackPressureState backPressureState; - - OutboundTcpConnectionPool(InetAddress remoteEp, BackPressureState backPressureState) - { - id = remoteEp; - resetEndpoint = SystemKeyspace.getPreferredIP(remoteEp); - started = new CountDownLatch(1); - - smallMessages = new OutboundTcpConnection(this, "Small"); - largeMessages = new OutboundTcpConnection(this, "Large"); - gossipMessages = new OutboundTcpConnection(this, "Gossip"); - - this.backPressureState = backPressureState; - } - - /** - * returns the appropriate connection based on message type. - * returns null if a connection could not be established. - */ - OutboundTcpConnection getConnection(MessageOut msg) - { - if (Stage.GOSSIP == msg.getStage()) - return gossipMessages; - return msg.payloadSize(smallMessages.getTargetVersion()) > LARGE_MESSAGE_THRESHOLD - ? largeMessages - : smallMessages; - } - - public BackPressureState getBackPressureState() - { - return backPressureState; - } - - void reset() - { - for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages, gossipMessages }) - conn.closeSocket(false); - } - - public void resetToNewerVersion(int version) - { - for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages, gossipMessages }) - { - if (version > conn.getTargetVersion()) - conn.softCloseSocket(); - } - } - - /** - * reconnect to @param remoteEP (after the current message backlog is exhausted). - * Used by Ec2MultiRegionSnitch to force nodes in the same region to communicate over their private IPs. - * @param remoteEP - */ - public void reset(InetAddress remoteEP) - { - SystemKeyspace.updatePreferredIP(id, remoteEP); - resetEndpoint = remoteEP; - for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages, gossipMessages }) - conn.softCloseSocket(); - - // release previous metrics and create new one with reset address - metrics.release(); - metrics = new ConnectionMetrics(resetEndpoint, this); - } - - public long getTimeouts() - { - return metrics.timeouts.getCount(); - } - - - public void incrementTimeout() - { - metrics.timeouts.mark(); - } - - public Socket newSocket() throws IOException - { - return newSocket(endPoint()); - } - - @SuppressWarnings("resource") // Closing the socket will close the underlying channel. - public static Socket newSocket(InetAddress endpoint) throws IOException - { - // zero means 'bind on any available port.' - if (isEncryptedChannel(endpoint)) - { - return SSLFactory.getSocket(DatabaseDescriptor.getServerEncryptionOptions(), endpoint, DatabaseDescriptor.getSSLStoragePort()); - } - else - { - SocketChannel channel = SocketChannel.open(); - channel.connect(new InetSocketAddress(endpoint, DatabaseDescriptor.getStoragePort())); - return channel.socket(); - } - } - - public static int portFor(InetAddress endpoint) - { - return isEncryptedChannel(endpoint) ? DatabaseDescriptor.getSSLStoragePort() : DatabaseDescriptor.getStoragePort(); - } - - public InetAddress endPoint() - { - if (id.equals(FBUtilities.getBroadcastAddress())) - return FBUtilities.getLocalAddress(); - return resetEndpoint; - } - - public static boolean isEncryptedChannel(InetAddress address) - { - IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - switch (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption) - { - case none: - return false; // if nothing needs to be encrypted then return immediately. - case all: - break; - case dc: - if (snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress()))) - return false; - break; - case rack: - // for rack then check if the DC's are the same. - if (snitch.getRack(address).equals(snitch.getRack(FBUtilities.getBroadcastAddress())) - && snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress()))) - return false; - break; - } - return true; - } - - public void start() - { - smallMessages.start(); - largeMessages.start(); - gossipMessages.start(); - - metrics = new ConnectionMetrics(id, this); - - started.countDown(); - } - - public void waitForStarted() - { - if (started.getCount() == 0) - return; - - boolean error = false; - try - { - if (!started.await(1, TimeUnit.MINUTES)) - error = true; - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - error = true; - } - if (error) - throw new IllegalStateException(String.format("Connections to %s are not started!", id.getHostAddress())); - } - - public void close() - { - // these null guards are simply for tests - if (largeMessages != null) - largeMessages.closeSocket(true); - if (smallMessages != null) - smallMessages.closeSocket(true); - if (gossipMessages != null) - gossipMessages.closeSocket(true); - if (metrics != null) - metrics.release(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java new file mode 100644 index 0000000..f9fa07a --- /dev/null +++ b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import org.apache.cassandra.io.util.DataInputPlus; + +public class ByteBufDataInputPlus extends ByteBufInputStream implements DataInputPlus +{ + public ByteBufDataInputPlus(ByteBuf buffer) + { + super(buffer); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java new file mode 100644 index 0000000..0473465 --- /dev/null +++ b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +import com.google.common.base.Function; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.Memory; +import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus; +import org.apache.cassandra.utils.memory.MemoryUtil; +import org.apache.cassandra.utils.vint.VIntCoding; + +/** + * A {@link DataOutputPlus} that uses a {@link ByteBuf} as a backing buffer. This class is completely thread unsafe and + * it is expected that the backing buffer is sized correctly for all the writes you want to do (or the buffer needs + * to be growable). + */ +public class ByteBufDataOutputPlus extends ByteBufOutputStream implements DataOutputPlus +{ + private final ByteBuf buffer; + + /** + * ByteBuffer to use for defensive copies of direct {@link ByteBuffer}s - see {@link #write(ByteBuffer)}. + */ + private final ByteBuffer hollowBuffer = MemoryUtil.getHollowDirectByteBuffer(); + + public ByteBufDataOutputPlus(ByteBuf buffer) + { + super(buffer); + this.buffer = buffer; + } + + /** + * {@inheritDoc} - "write the buffer without modifying its position" + * + * Unfortunately, netty's {@link ByteBuf#writeBytes(ByteBuffer)} modifies the byteBuffer's position, + * and that is unsafe in our world wrt multithreading. Hence we need to be careful: reference the backing array + * on heap ByteBuffers, and use a reusable "hollow" ByteBuffer ({@link #hollowBuffer}) for direct ByteBuffers. + */ + @Override + public void write(ByteBuffer byteBuffer) throws IOException + { + if (byteBuffer.hasArray()) + { + write(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), byteBuffer.remaining()); + } + else + { + assert byteBuffer.isDirect(); + MemoryUtil.duplicateDirectByteBuffer(byteBuffer, hollowBuffer); + buffer.writeBytes(hollowBuffer); + } + } + + @Override + public void write(Memory memory, long offset, long length) throws IOException + { + for (ByteBuffer buffer : memory.asByteBuffers(offset, length)) + write(buffer); + } + + @Override + public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public void writeVInt(long v) throws IOException + { + writeUnsignedVInt(VIntCoding.encodeZigZag64(v)); + } + + @Override + public void writeUnsignedVInt(long v) throws IOException + { + int size = VIntCoding.computeUnsignedVIntSize(v); + if (size == 1) + { + buffer.writeByte((byte) (v & 0xFF)); + return; + } + + buffer.writeBytes(VIntCoding.encodeVInt(v, size), 0, size); + } + + @Override + public void write(int b) throws IOException + { + buffer.writeByte((byte) (b & 0xFF)); + } + + @Override + public void writeByte(int v) throws IOException + { + buffer.writeByte((byte) (v & 0xFF)); + } + + @Override + public void writeBytes(String s) throws IOException + { + for (int index = 0; index < s.length(); index++) + buffer.writeByte(s.charAt(index) & 0xFF); + } + + @Override + public void writeChars(String s) throws IOException + { + for (int index = 0; index < s.length(); index++) + buffer.writeChar(s.charAt(index)); + } + + @Override + public void writeUTF(String s) throws IOException + { + UnbufferedDataOutputStreamPlus.writeUTF(s, this); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org