Repository: cassandra Updated Branches: refs/heads/trunk 60e2e9826 -> 732d1af86
Outbound TCP connections should consult internode authenticator. Patch by Ariel Weisberg; Reviewed by Marcus Eriksson for CASSANDRA-13324 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/732d1af8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/732d1af8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/732d1af8 Branch: refs/heads/trunk Commit: 732d1af866b91e5ba63e7e2a467d99d4cb90e11f Parents: 60e2e98 Author: Ariel Weisberg <aweisb...@apple.com> Authored: Fri Mar 24 15:26:50 2017 -0400 Committer: Ariel Weisberg <aweisb...@apple.com> Committed: Fri Mar 24 15:26:50 2017 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/auth/AuthConfig.java | 10 +--- .../cassandra/config/DatabaseDescriptor.java | 5 +- .../locator/ReconnectableSnitchHelper.java | 21 +++++-- .../apache/cassandra/net/MessagingService.java | 44 ++++++++++++-- .../cassandra/net/OutboundTcpConnection.java | 33 +++++++--- .../net/OutboundTcpConnectionPool.java | 9 ++- .../config/DatabaseDescriptorRefTest.java | 1 + .../locator/ReconnectableSnitchHelperTest.java | 63 ++++++++++++++++++++ .../cassandra/net/MessagingServiceTest.java | 60 +++++++++++++++++++ 10 files changed, 218 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fb9b8c4..b42bde6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324) * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360) * Cleanup ParentRepairSession after repairs (CASSANDRA-13359) * Incremental repair not streaming correct sstables (CASSANDRA-13328) http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/auth/AuthConfig.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/auth/AuthConfig.java b/src/java/org/apache/cassandra/auth/AuthConfig.java index c389ae4..2ca1522 100644 --- a/src/java/org/apache/cassandra/auth/AuthConfig.java +++ b/src/java/org/apache/cassandra/auth/AuthConfig.java @@ -25,6 +25,7 @@ import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.utils.FBUtilities; +import org.hsqldb.Database; /** * Only purpose is to Initialize authentication/authorization via {@link #applyAuth()}. @@ -94,13 +95,8 @@ public final class AuthConfig // authenticator - IInternodeAuthenticator internodeAuthenticator; if (conf.internode_authenticator != null) - internodeAuthenticator = FBUtilities.construct(conf.internode_authenticator, "internode_authenticator"); - else - internodeAuthenticator = new AllowAllInternodeAuthenticator(); - - DatabaseDescriptor.setInternodeAuthenticator(internodeAuthenticator); + DatabaseDescriptor.setInternodeAuthenticator(FBUtilities.construct(conf.internode_authenticator, "internode_authenticator")); // Validate at last to have authenticator, authorizer, role-manager and internode-auth setup // in case these rely on each other. @@ -108,6 +104,6 @@ public final class AuthConfig authenticator.validateConfiguration(); authorizer.validateConfiguration(); roleManager.validateConfiguration(); - internodeAuthenticator.validateConfiguration(); + DatabaseDescriptor.getInternodeAuthenticator().validateConfiguration(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 4fb742c..465cd8a 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -29,6 +29,7 @@ import java.nio.file.Paths; import java.util.*; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; @@ -36,6 +37,7 @@ import com.google.common.primitives.Longs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.auth.AllowAllInternodeAuthenticator; import org.apache.cassandra.auth.AuthConfig; import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.auth.IAuthorizer; @@ -79,7 +81,7 @@ public class DatabaseDescriptor private static InetAddress rpcAddress; private static InetAddress broadcastRpcAddress; private static SeedProvider seedProvider; - private static IInternodeAuthenticator internodeAuthenticator; + private static IInternodeAuthenticator internodeAuthenticator = new AllowAllInternodeAuthenticator(); /* Hashing strategy Random or OPHF */ private static IPartitioner partitioner; @@ -1538,6 +1540,7 @@ public class DatabaseDescriptor public static void setInternodeAuthenticator(IInternodeAuthenticator internodeAuthenticator) { + Preconditions.checkNotNull(internodeAuthenticator); DatabaseDescriptor.internodeAuthenticator = internodeAuthenticator; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java index a6bec0c..08f0a14 100644 --- a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java +++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java @@ -21,8 +21,12 @@ package org.apache.cassandra.locator; import java.net.InetAddress; import java.net.UnknownHostException; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.gms.*; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.OutboundTcpConnectionPool; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +53,7 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber { try { - reconnect(publicAddress, InetAddress.getByName(localAddressValue.value)); + reconnect(publicAddress, InetAddress.getByName(localAddressValue.value), snitch, localDc); } catch (UnknownHostException e) { @@ -57,12 +61,21 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber } } - private void reconnect(InetAddress publicAddress, InetAddress localAddress) + @VisibleForTesting + static void reconnect(InetAddress publicAddress, InetAddress localAddress, IEndpointSnitch snitch, String localDc) { + OutboundTcpConnectionPool cp = MessagingService.instance().getConnectionPool(publicAddress); + //InternodeAuthenticator said don't connect + if (cp == null) + { + logger.debug("InternodeAuthenticator said don't reconnect to {} on {}", publicAddress, localAddress); + return; + } + if (snitch.getDatacenter(publicAddress).equals(localDc) - && !MessagingService.instance().getConnectionPool(publicAddress).endPoint().equals(localAddress)) + && !cp.endPoint().equals(localAddress)) { - MessagingService.instance().getConnectionPool(publicAddress).reset(localAddress); + cp.reset(localAddress); logger.debug("Initiated reconnect to an Internal IP {} for the {}", localAddress, publicAddress); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 729c042..55604d0 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -416,7 +416,8 @@ public final class MessagingService implements MessagingServiceMBean /* Lookup table for registering message handlers based on the verb. */ private final Map<Verb, IVerbHandler> verbHandlers; - private final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<>(); + @VisibleForTesting + final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<>(); private static final Logger logger = LoggerFactory.getLogger(MessagingService.class); private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000; @@ -531,6 +532,10 @@ public final class MessagingService implements MessagingServiceMBean maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout); ConnectionMetrics.totalTimeouts.mark(); + OutboundTcpConnectionPool cp = getConnectionPool(expiredCallbackInfo.target); + if (cp != null) + cp.incrementTimeout(); + getConnectionPool(expiredCallbackInfo.target).incrementTimeout(); if (expiredCallbackInfo.callback.supportsBackPressure()) @@ -670,8 +675,16 @@ public final class MessagingService implements MessagingServiceMBean */ public void convict(InetAddress ep) { - logger.trace("Resetting pool for {}", ep); - getConnectionPool(ep).reset(); + OutboundTcpConnectionPool cp = getConnectionPool(ep); + if (cp != null) + { + logger.trace("Resetting pool for {}", ep); + getConnectionPool(ep).reset(); + } + else + { + logger.debug("Not resetting pool for {} because internode authenticator said not to connect", ep); + } } public void listen() @@ -795,11 +808,22 @@ public final class MessagingService implements MessagingServiceMBean connectionManagers.remove(to); } + /** + * Get a connection pool to the specified endpoint. Constructs one if none exists. + * + * Can return null if the InternodeAuthenticator fails to authenticate the node. + * @param to + * @return The connection pool or null if internode authenticator says not to + */ public OutboundTcpConnectionPool getConnectionPool(InetAddress to) { OutboundTcpConnectionPool cp = connectionManagers.get(to); if (cp == null) { + //Don't attempt to connect to nodes that won't (or shouldn't) authenticate anyways + if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(to, OutboundTcpConnectionPool.portFor(to))) + return null; + cp = new OutboundTcpConnectionPool(to, backPressure.newState(to)); OutboundTcpConnectionPool existingPool = connectionManagers.putIfAbsent(to, cp); if (existingPool != null) @@ -811,10 +835,17 @@ public final class MessagingService implements MessagingServiceMBean return cp; } - + /** + * Get a connection for a message to a specific endpoint. Constructs one if none exists. + * + * Can return null if the InternodeAuthenticator fails to authenticate the node. + * @param to + * @return The connection or null if internode authenticator says not to + */ public OutboundTcpConnection getConnection(InetAddress to, MessageOut msg) { - return getConnectionPool(to).getConnection(msg); + OutboundTcpConnectionPool cp = getConnectionPool(to); + return cp == null ? null : cp.getConnection(msg); } /** @@ -968,7 +999,8 @@ public final class MessagingService implements MessagingServiceMBean OutboundTcpConnection connection = getConnection(to, message); // write it - connection.enqueue(message, id); + if (connection != null) + connection.enqueue(message, id); } public <T> AsyncOneResponse<T> sendRR(MessageOut message, InetAddress to) http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index feff527..9b19eab 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -249,6 +249,12 @@ public class OutboundTcpConnection extends FastThreadLocalThread break inner; } } + catch (InternodeAuthFailed e) + { + logger.warn("Internode auth failed connecting to " + poolReference.endPoint()); + //Remove the connection pool and other thread so messages aren't queued + MessagingService.instance().destroyConnectionPool(poolReference.endPoint()); + } catch (Exception e) { JVMStabilityInspector.inspectThrowable(e); @@ -394,20 +400,27 @@ public class OutboundTcpConnection extends FastThreadLocalThread } @SuppressWarnings("resource") - private boolean connect() + private boolean connect() throws InternodeAuthFailed { - logger.debug("Attempting to connect to {}", poolReference.endPoint()); + InetAddress endpoint = poolReference.endPoint(); + if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(endpoint, poolReference.portFor(endpoint))) + { + throw new InternodeAuthFailed(); + } + + logger.debug("Attempting to connect to {}", endpoint); + long start = System.nanoTime(); long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout()); while (System.nanoTime() - start < timeout) { - targetVersion = MessagingService.instance().getVersion(poolReference.endPoint()); + targetVersion = MessagingService.instance().getVersion(endpoint); try { socket = poolReference.newSocket(); socket.setKeepAlive(true); - if (isLocalDC(poolReference.endPoint())) + if (isLocalDC(endpoint)) { socket.setTcpNoDelay(INTRADC_TCP_NODELAY); } @@ -446,7 +459,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread } else { - MessagingService.instance().setVersion(poolReference.endPoint(), maxTargetVersion); + MessagingService.instance().setVersion(endpoint, maxTargetVersion); } if (targetVersion > maxTargetVersion) @@ -454,7 +467,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread logger.trace("Target max version is {}; will reconnect with that version", maxTargetVersion); try { - if (DatabaseDescriptor.getSeeds().contains(poolReference.endPoint())) + if (DatabaseDescriptor.getSeeds().contains(endpoint)) logger.warn("Seed gossip version is {}; will not connect with that version", maxTargetVersion); } catch (Throwable e) @@ -484,7 +497,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread if (shouldCompressConnection()) { out.flush(); - logger.trace("Upgrading OutputStream to {} to be compressed", poolReference.endPoint()); + logger.trace("Upgrading OutputStream to {} to be compressed", endpoint); // TODO: custom LZ4 OS that supports BB write methods LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor(); @@ -495,7 +508,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread checksum, true)); // no async flushing } - logger.debug("Done connecting to {}", poolReference.endPoint()); + logger.debug("Done connecting to {}", endpoint); return true; } catch (SSLHandshakeException e) @@ -508,7 +521,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread catch (IOException e) { socket = null; - logger.debug("Unable to connect to {}", poolReference.endPoint(), e); + logger.debug("Unable to connect to {}", endpoint, e); Uninterruptibles.sleepUninterruptibly(OPEN_RETRY_DELAY, TimeUnit.MILLISECONDS); } } @@ -613,4 +626,6 @@ public class OutboundTcpConnection extends FastThreadLocalThread return false; } } + + private static class InternodeAuthFailed extends Exception {} } http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java index 9f9ffee..20a8da6 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java @@ -148,6 +148,11 @@ public class OutboundTcpConnectionPool } } + public static int portFor(InetAddress endpoint) + { + return isEncryptedChannel(endpoint) ? DatabaseDescriptor.getSSLStoragePort() : DatabaseDescriptor.getStoragePort(); + } + public InetAddress endPoint() { if (id.equals(FBUtilities.getBroadcastAddress())) @@ -218,7 +223,7 @@ public class OutboundTcpConnectionPool smallMessages.closeSocket(true); if (gossipMessages != null) gossipMessages.closeSocket(true); - - metrics.release(); + if (metrics != null) + metrics.release(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java index 17cdd77..c8f8bc1 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java @@ -54,6 +54,7 @@ import static org.junit.Assert.fail; public class DatabaseDescriptorRefTest { static final String[] validClasses = { + "org.apache.cassandra.auth.AllowAllInternodeAuthenticator", "org.apache.cassandra.auth.IInternodeAuthenticator", "org.apache.cassandra.auth.IAuthenticator", "org.apache.cassandra.auth.IAuthorizer", http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java new file mode 100644 index 0000000..232865a --- /dev/null +++ b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; + +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.auth.IInternodeAuthenticator; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.net.MessagingServiceTest; + +public class ReconnectableSnitchHelperTest +{ + static final IInternodeAuthenticator originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator(); + + @BeforeClass + public static void beforeClass() throws UnknownHostException + { + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setBackPressureStrategy(new MessagingServiceTest.MockBackPressureStrategy(Collections.emptyMap())); + DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1")); + } + + /** + * Make sure that if a node fails internode authentication and MessagingService returns a null + * pool that ReconnectableSnitchHelper fails gracefully. + */ + @Test + public void failedAuthentication() throws Exception + { + DatabaseDescriptor.setInternodeAuthenticator(MessagingServiceTest.ALLOW_NOTHING_AUTHENTICATOR); + InetAddress address = InetAddress.getByName("127.0.0.250"); + //Should tolerate null returns by MS for the connection + ReconnectableSnitchHelper.reconnect(address, address, null, null); + } + + @After + public void replaceAuthenticator() + { + DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/test/unit/org/apache/cassandra/net/MessagingServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java index d9a9915..e6b5cd0 100644 --- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -36,12 +36,16 @@ import java.util.concurrent.TimeUnit; import com.google.common.collect.Iterables; import com.codahale.metrics.Timer; +import org.apache.cassandra.auth.IInternodeAuthenticator; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.monitoring.ApproximateTime; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; import org.caffinitas.ohc.histo.EstimatedHistogram; +import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -52,6 +56,20 @@ public class MessagingServiceTest { private final static long ONE_SECOND = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS); private final static long[] bucketOffsets = new EstimatedHistogram(160).getBucketOffsets(); + public static final IInternodeAuthenticator ALLOW_NOTHING_AUTHENTICATOR = new IInternodeAuthenticator() + { + public boolean authenticate(InetAddress remoteAddress, int remotePort) + { + return false; + } + + public void validateConfiguration() throws ConfigurationException + { + + } + }; + static final IInternodeAuthenticator originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator(); + private final MessagingService messagingService = MessagingService.test(); @BeforeClass @@ -368,4 +386,46 @@ public class MessagingServiceTest throw new UnsupportedOperationException("Not supported."); } } + + /** + * Make sure that if internode authenticatino fails for an outbound connection that all the code that relies + * on getting the connection pool handles the null return + * @throws Exception + */ + @Test + public void testFailedInternodeAuth() throws Exception + { + MessagingService ms = MessagingService.instance(); + DatabaseDescriptor.setInternodeAuthenticator(ALLOW_NOTHING_AUTHENTICATOR); + InetAddress address = InetAddress.getByName("127.0.0.250"); + + //Should return null + assertNull(ms.getConnectionPool(address)); + assertNull(ms.getConnection(address, new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK))); + + //Should tolerate null + ms.convict(address); + ms.sendOneWay(new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK), address); + } + + @Test + public void testOutboundTcpConnectionCleansUp() throws Exception + { + MessagingService ms = MessagingService.instance(); + DatabaseDescriptor.setInternodeAuthenticator(ALLOW_NOTHING_AUTHENTICATOR); + InetAddress address = InetAddress.getByName("127.0.0.250"); + OutboundTcpConnectionPool pool = new OutboundTcpConnectionPool(address, new MockBackPressureStrategy(null).newState(address)); + ms.connectionManagers.put(address, pool); + pool.smallMessages.start(); + pool.smallMessages.enqueue(new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK), 0); + pool.smallMessages.join(); + assertFalse(ms.connectionManagers.containsKey(address)); + } + + @After + public void replaceAuthenticator() + { + DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator); + } + }