http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java b/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java index 522b118..df81a9b 100644 --- a/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java @@ -18,7 +18,7 @@ package org.apache.cassandra.locator; import java.io.IOException; -import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -32,8 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; - -import com.google.common.net.InetAddresses; +import java.util.regex.Matcher; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.IPartitioner; @@ -61,7 +60,7 @@ public class PropertyFileSnitchTest private Path backupFile; private VersionedValue.VersionedValueFactory valueFactory; - private Map<InetAddress, Set<Token>> tokenMap; + private Map<InetAddressAndPort, Set<Token>> tokenMap; @BeforeClass public static void setupDD() @@ -78,17 +77,17 @@ public class PropertyFileSnitchTest restoreOrigConfigFile(); - InetAddress[] hosts = { - InetAddress.getByName("127.0.0.1"), // this exists in the config file - InetAddress.getByName("127.0.0.2"), // this exists in the config file - InetAddress.getByName("127.0.0.9"), // this does not exist in the config file + InetAddressAndPort[] hosts = { + InetAddressAndPort.getByName("127.0.0.1"), // this exists in the config file + InetAddressAndPort.getByName("127.0.0.2"), // this exists in the config file + InetAddressAndPort.getByName("127.0.0.9"), // this does not exist in the config file }; IPartitioner partitioner = new RandomPartitioner(); valueFactory = new VersionedValue.VersionedValueFactory(partitioner); tokenMap = new HashMap<>(); - for (InetAddress host : hosts) + for (InetAddressAndPort host : hosts) { Set<Token> tokens = Collections.singleton(partitioner.getRandomToken()); Gossiper.instance.initializeNodeUnsafe(host, UUID.randomUUID(), 1); @@ -117,13 +116,21 @@ public class PropertyFileSnitchTest for (String line : lines) { String[] info = line.split("="); - if (info.length == 2 && replacements.containsKey(info[0])) + if (info.length == 2 && !line.startsWith("#") && !line.startsWith("default=")) { - String replacement = replacements.get(info[0]); - if (!replacement.isEmpty()) // empty means remove this line - newLines.add(info[0] + '=' + replacement); - - replaced.add(info[0]); + InetAddressAndPort address = InetAddressAndPort.getByName(info[0].replaceAll(Matcher.quoteReplacement("\\:"), ":")); + String replacement = replacements.get(address.toString()); + if (replacement != null) + { + if (!replacement.isEmpty()) // empty means remove this line + newLines.add(info[0] + '=' + replacement); + + replaced.add(address.toString()); + } + else + { + newLines.add(line); + } } else { @@ -138,21 +145,26 @@ public class PropertyFileSnitchTest continue; if (!replacement.getValue().isEmpty()) // empty means remove this line so do nothing here - newLines.add(replacement.getKey() + '=' + replacement.getValue()); + { + String escaped = replacement.getKey().replaceAll(Matcher.quoteReplacement(":"), "\\\\:"); + newLines.add(escaped + '=' + replacement.getValue()); + } } Files.write(effectiveFile, newLines, StandardCharsets.UTF_8, StandardOpenOption.TRUNCATE_EXISTING); } - private void setNodeShutdown(InetAddress host) + private void setNodeShutdown(InetAddressAndPort host) { StorageService.instance.getTokenMetadata().removeEndpoint(host); + Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS_WITH_PORT, valueFactory.shutdown(true)); Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS, valueFactory.shutdown(true)); Gossiper.instance.markDead(host, Gossiper.instance.getEndpointStateForEndpoint(host)); } - private void setNodeLive(InetAddress host) + private void setNodeLive(InetAddressAndPort host) { + Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS_WITH_PORT, valueFactory.normal(tokenMap.get(host))); Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS, valueFactory.normal(tokenMap.get(host))); Gossiper.instance.realMarkAlive(host, Gossiper.instance.getEndpointStateForEndpoint(host)); StorageService.instance.getTokenMetadata().updateNormalTokens(tokenMap.get(host), host); @@ -160,9 +172,9 @@ public class PropertyFileSnitchTest private static void checkEndpoint(final AbstractNetworkTopologySnitch snitch, final String endpointString, final String expectedDatacenter, - final String expectedRack) + final String expectedRack) throws UnknownHostException { - final InetAddress endpoint = InetAddresses.forString(endpointString); + final InetAddressAndPort endpoint = InetAddressAndPort.getByName(endpointString); assertEquals(expectedDatacenter, snitch.getDatacenter(endpoint)); assertEquals(expectedRack, snitch.getRack(endpoint)); } @@ -174,25 +186,25 @@ public class PropertyFileSnitchTest @Test public void testChangeHostRack() throws Exception { - final InetAddress host = InetAddress.getByName("127.0.0.1"); + final InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.1"); final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1); - checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1"); + checkEndpoint(snitch, host.toString(), "DC1", "RAC1"); try { setNodeLive(host); Files.copy(effectiveFile, backupFile); - replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC1:RAC2")); + replaceConfigFile(Collections.singletonMap(host.toString(), "DC1:RAC2")); Thread.sleep(1500); - checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1"); + checkEndpoint(snitch, host.toString(), "DC1", "RAC1"); setNodeShutdown(host); - replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC1:RAC2")); + replaceConfigFile(Collections.singletonMap(host.toString(), "DC1:RAC2")); Thread.sleep(1500); - checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2"); + checkEndpoint(snitch, host.toString(), "DC1", "RAC2"); } finally { @@ -208,25 +220,25 @@ public class PropertyFileSnitchTest @Test public void testChangeHostDc() throws Exception { - final InetAddress host = InetAddress.getByName("127.0.0.1"); + final InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.1"); final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1); - checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1"); + checkEndpoint(snitch, host.toString(), "DC1", "RAC1"); try { setNodeLive(host); Files.copy(effectiveFile, backupFile); - replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC1")); + replaceConfigFile(Collections.singletonMap(host.toString(), "DC2:RAC1")); Thread.sleep(1500); - checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1"); + checkEndpoint(snitch, host.toString(), "DC1", "RAC1"); setNodeShutdown(host); - replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC1")); + replaceConfigFile(Collections.singletonMap(host.toString(), "DC2:RAC1")); Thread.sleep(1500); - checkEndpoint(snitch, host.getHostAddress(), "DC2", "RAC1"); + checkEndpoint(snitch, host.toString(), "DC2", "RAC1"); } finally { @@ -243,25 +255,25 @@ public class PropertyFileSnitchTest @Test public void testAddHost() throws Exception { - final InetAddress host = InetAddress.getByName("127.0.0.9"); + final InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.9"); final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1); - checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default + checkEndpoint(snitch, host.toString(), "DC1", "r1"); // default try { setNodeLive(host); Files.copy(effectiveFile, backupFile); - replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC2")); // add this line if not yet there + replaceConfigFile(Collections.singletonMap(host.toString(), "DC2:RAC2")); // add this line if not yet there Thread.sleep(1500); - checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // unchanged + checkEndpoint(snitch, host.toString(), "DC1", "r1"); // unchanged setNodeShutdown(host); - replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC2")); // add this line if not yet there + replaceConfigFile(Collections.singletonMap(host.toString(), "DC2:RAC2")); // add this line if not yet there Thread.sleep(1500); - checkEndpoint(snitch, host.getHostAddress(), "DC2", "RAC2"); // changed + checkEndpoint(snitch, host.toString(), "DC2", "RAC2"); // changed } finally { @@ -278,25 +290,25 @@ public class PropertyFileSnitchTest @Test public void testRemoveHost() throws Exception { - final InetAddress host = InetAddress.getByName("127.0.0.2"); + final InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.2"); final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1); - checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2"); + checkEndpoint(snitch, host.toString(), "DC1", "RAC2"); try { setNodeLive(host); Files.copy(effectiveFile, backupFile); - replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "")); // removes line if found + replaceConfigFile(Collections.singletonMap(host.toString(), "")); // removes line if found Thread.sleep(1500); - checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2"); // unchanged + checkEndpoint(snitch, host.toString(), "DC1", "RAC2"); // unchanged setNodeShutdown(host); - replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "")); // removes line if found + replaceConfigFile(Collections.singletonMap(host.toString(), "")); // removes line if found Thread.sleep(1500); - checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default + checkEndpoint(snitch, host.toString(), "DC1", "r1"); // default } finally { @@ -313,9 +325,9 @@ public class PropertyFileSnitchTest @Test public void testChangeDefault() throws Exception { - final InetAddress host = InetAddress.getByName("127.0.0.9"); + final InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.9"); final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1); - checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default + checkEndpoint(snitch, host.toString(), "DC1", "r1"); // default try { @@ -325,13 +337,13 @@ public class PropertyFileSnitchTest replaceConfigFile(Collections.singletonMap("default", "DC2:r2")); // change default Thread.sleep(1500); - checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // unchanged + checkEndpoint(snitch, host.toString(), "DC1", "r1"); // unchanged setNodeShutdown(host); replaceConfigFile(Collections.singletonMap("default", "DC2:r2")); // change default again (refresh file update) Thread.sleep(1500); - checkEndpoint(snitch, host.getHostAddress(), "DC2", "r2"); // default updated + checkEndpoint(snitch, host.toString(), "DC2", "r2"); // default updated } finally {
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java index 232865a..b1c3775 100644 --- a/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java +++ b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java @@ -50,7 +50,7 @@ public class ReconnectableSnitchHelperTest public void failedAuthentication() throws Exception { DatabaseDescriptor.setInternodeAuthenticator(MessagingServiceTest.ALLOW_NOTHING_AUTHENTICATOR); - InetAddress address = InetAddress.getByName("127.0.0.250"); + InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.250"); //Should tolerate null returns by MS for the connection ReconnectableSnitchHelper.reconnect(address, address, null, null); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java index b83194b..a8caa72 100644 --- a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java +++ b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -55,14 +54,14 @@ public class ReplicationStrategyEndpointCacheTest strategy = getStrategyWithNewTokenMetadata(Keyspace.open(KEYSPACE).getReplicationStrategy(), tmd); - tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddress.getByName("127.0.0.1")); - tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddress.getByName("127.0.0.2")); - tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.3")); - tmd.updateNormalToken(new BigIntegerToken(String.valueOf(40)), InetAddress.getByName("127.0.0.4")); - //tmd.updateNormalToken(new BigIntegerToken(String.valueOf(50)), InetAddress.getByName("127.0.0.5")); - tmd.updateNormalToken(new BigIntegerToken(String.valueOf(60)), InetAddress.getByName("127.0.0.6")); - tmd.updateNormalToken(new BigIntegerToken(String.valueOf(70)), InetAddress.getByName("127.0.0.7")); - tmd.updateNormalToken(new BigIntegerToken(String.valueOf(80)), InetAddress.getByName("127.0.0.8")); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddressAndPort.getByName("127.0.0.1")); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddressAndPort.getByName("127.0.0.2")); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddressAndPort.getByName("127.0.0.3")); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(40)), InetAddressAndPort.getByName("127.0.0.4")); + //tmd.updateNormalToken(new BigIntegerToken(String.valueOf(50)), InetAddressAndPort.getByName("127.0.0.5", null, null)); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(60)), InetAddressAndPort.getByName("127.0.0.6")); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(70)), InetAddressAndPort.getByName("127.0.0.7")); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(80)), InetAddressAndPort.getByName("127.0.0.8")); } @Test @@ -90,31 +89,31 @@ public class ReplicationStrategyEndpointCacheTest public void runCacheRespectsTokenChangesTest(Class stratClass, Map<String, String> configOptions) throws Exception { setup(stratClass, configOptions); - ArrayList<InetAddress> initial; - ArrayList<InetAddress> endpoints; + ArrayList<InetAddressAndPort> initial; + ArrayList<InetAddressAndPort> endpoints; endpoints = strategy.getNaturalEndpoints(searchToken); assert endpoints.size() == 5 : StringUtils.join(endpoints, ","); // test token addition, in DC2 before existing token initial = strategy.getNaturalEndpoints(searchToken); - tmd.updateNormalToken(new BigIntegerToken(String.valueOf(35)), InetAddress.getByName("127.0.0.5")); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(35)), InetAddressAndPort.getByName("127.0.0.5")); endpoints = strategy.getNaturalEndpoints(searchToken); assert endpoints.size() == 5 : StringUtils.join(endpoints, ","); assert !endpoints.equals(initial); // test token removal, newly created token initial = strategy.getNaturalEndpoints(searchToken); - tmd.removeEndpoint(InetAddress.getByName("127.0.0.5")); + tmd.removeEndpoint(InetAddressAndPort.getByName("127.0.0.5")); endpoints = strategy.getNaturalEndpoints(searchToken); assert endpoints.size() == 5 : StringUtils.join(endpoints, ","); - assert !endpoints.contains(InetAddress.getByName("127.0.0.5")); + assert !endpoints.contains(InetAddressAndPort.getByName("127.0.0.5")); assert !endpoints.equals(initial); // test token change initial = strategy.getNaturalEndpoints(searchToken); //move .8 after search token but before other DC3 - tmd.updateNormalToken(new BigIntegerToken(String.valueOf(25)), InetAddress.getByName("127.0.0.8")); + tmd.updateNormalToken(new BigIntegerToken(String.valueOf(25)), InetAddressAndPort.getByName("127.0.0.8")); endpoints = strategy.getNaturalEndpoints(searchToken); assert endpoints.size() == 5 : StringUtils.join(endpoints, ","); assert !endpoints.equals(initial); @@ -129,7 +128,7 @@ public class ReplicationStrategyEndpointCacheTest super(keyspaceName, tokenMetadata, snitch, configOptions); } - public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata) + public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata) { assert !called : "calculateNaturalEndpoints was already called, result should have been cached"; called = true; @@ -146,7 +145,7 @@ public class ReplicationStrategyEndpointCacheTest super(keyspaceName, tokenMetadata, snitch, configOptions); } - public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata) + public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata) { assert !called : "calculateNaturalEndpoints was already called, result should have been cached"; called = true; @@ -163,7 +162,7 @@ public class ReplicationStrategyEndpointCacheTest super(keyspaceName, tokenMetadata, snitch, configOptions); } - public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata) + public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata) { assert !called : "calculateNaturalEndpoints was already called, result should have been cached"; called = true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java index f97a6e5..fe77b0e 100644 --- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; @@ -98,22 +97,22 @@ public class SimpleStrategyTest { tmd = new TokenMetadata(); strategy = getStrategy(keyspaceName, tmd); - List<InetAddress> hosts = new ArrayList<InetAddress>(); + List<InetAddressAndPort> hosts = new ArrayList<>(); for (int i = 0; i < endpointTokens.length; i++) { - InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1)); + InetAddressAndPort ep = InetAddressAndPort.getByName("127.0.0." + String.valueOf(i + 1)); tmd.updateNormalToken(endpointTokens[i], ep); hosts.add(ep); } for (int i = 0; i < keyTokens.length; i++) { - List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyTokens[i]); + List<InetAddressAndPort> endpoints = strategy.getNaturalEndpoints(keyTokens[i]); assertEquals(strategy.getReplicationFactor(), endpoints.size()); - List<InetAddress> correctEndpoints = new ArrayList<InetAddress>(); + List<InetAddressAndPort> correctEndpoints = new ArrayList<>(); for (int j = 0; j < endpoints.size(); j++) correctEndpoints.add(hosts.get((i + j + 1) % hosts.size())); - assertEquals(new HashSet<InetAddress>(correctEndpoints), new HashSet<InetAddress>(endpoints)); + assertEquals(new HashSet<>(correctEndpoints), new HashSet<>(endpoints)); } } } @@ -135,17 +134,17 @@ public class SimpleStrategyTest keyTokens[i] = new BigIntegerToken(String.valueOf(RING_SIZE * 2 * i + RING_SIZE)); } - List<InetAddress> hosts = new ArrayList<InetAddress>(); + List<InetAddressAndPort> hosts = new ArrayList<>(); for (int i = 0; i < endpointTokens.length; i++) { - InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1)); + InetAddressAndPort ep = InetAddressAndPort.getByName("127.0.0." + String.valueOf(i + 1)); tmd.updateNormalToken(endpointTokens[i], ep); hosts.add(ep); } // bootstrap at the end of the ring Token bsToken = new BigIntegerToken(String.valueOf(210)); - InetAddress bootstrapEndpoint = InetAddress.getByName("127.0.0.11"); + InetAddressAndPort bootstrapEndpoint = InetAddressAndPort.getByName("127.0.0.11"); tmd.addBootstrapToken(bsToken, bootstrapEndpoint); AbstractReplicationStrategy strategy = null; @@ -159,7 +158,7 @@ public class SimpleStrategyTest for (int i = 0; i < keyTokens.length; i++) { - Collection<InetAddress> endpoints = tmd.getWriteEndpoints(keyTokens[i], keyspaceName, strategy.getNaturalEndpoints(keyTokens[i])); + Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens[i], keyspaceName, strategy.getNaturalEndpoints(keyTokens[i])); assertTrue(endpoints.size() >= replicationFactor); for (int j = 0; j < replicationFactor; j++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java index e5a86fd..b589d2d 100644 --- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java +++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.locator; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Map; @@ -56,8 +55,8 @@ public class TokenMetadataTest { DatabaseDescriptor.daemonInitialization(); tmd = StorageService.instance.getTokenMetadata(); - tmd.updateNormalToken(token(ONE), InetAddress.getByName("127.0.0.1")); - tmd.updateNormalToken(token(SIX), InetAddress.getByName("127.0.0.6")); + tmd.updateNormalToken(token(ONE), InetAddressAndPort.getByName("127.0.0.1")); + tmd.updateNormalToken(token(SIX), InetAddressAndPort.getByName("127.0.0.6")); } private static void testRingIterator(ArrayList<Token> ring, String start, boolean includeMin, String... expected) @@ -98,8 +97,8 @@ public class TokenMetadataTest @Test public void testTopologyUpdate_RackConsolidation() throws UnknownHostException { - final InetAddress first = InetAddress.getByName("127.0.0.1"); - final InetAddress second = InetAddress.getByName("127.0.0.6"); + final InetAddressAndPort first = InetAddressAndPort.getByName("127.0.0.1"); + final InetAddressAndPort second = InetAddressAndPort.getByName("127.0.0.6"); final String DATA_CENTER = "datacenter1"; final String RACK1 = "rack1"; final String RACK2 = "rack2"; @@ -107,19 +106,19 @@ public class TokenMetadataTest DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch() { @Override - public String getRack(InetAddress endpoint) + public String getRack(InetAddressAndPort endpoint) { return endpoint.equals(first) ? RACK1 : RACK2; } @Override - public String getDatacenter(InetAddress endpoint) + public String getDatacenter(InetAddressAndPort endpoint) { return DATA_CENTER; } @Override - public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; } @@ -134,14 +133,14 @@ public class TokenMetadataTest TokenMetadata.Topology topology = tokenMetadata.getTopology(); assertNotNull(topology); - Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints(); + Multimap<String, InetAddressAndPort> allEndpoints = topology.getDatacenterEndpoints(); assertNotNull(allEndpoints); assertTrue(allEndpoints.size() == 2); assertTrue(allEndpoints.containsKey(DATA_CENTER)); assertTrue(allEndpoints.get(DATA_CENTER).contains(first)); assertTrue(allEndpoints.get(DATA_CENTER).contains(second)); - Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks(); + Map<String, Multimap<String, InetAddressAndPort>> racks = topology.getDatacenterRacks(); assertNotNull(racks); assertTrue(racks.size() == 1); assertTrue(racks.containsKey(DATA_CENTER)); @@ -154,19 +153,19 @@ public class TokenMetadataTest DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch() { @Override - public String getRack(InetAddress endpoint) + public String getRack(InetAddressAndPort endpoint) { return RACK1; } @Override - public String getDatacenter(InetAddress endpoint) + public String getDatacenter(InetAddressAndPort endpoint) { return DATA_CENTER; } @Override - public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; } @@ -196,8 +195,8 @@ public class TokenMetadataTest @Test public void testTopologyUpdate_RackExpansion() throws UnknownHostException { - final InetAddress first = InetAddress.getByName("127.0.0.1"); - final InetAddress second = InetAddress.getByName("127.0.0.6"); + final InetAddressAndPort first = InetAddressAndPort.getByName("127.0.0.1"); + final InetAddressAndPort second = InetAddressAndPort.getByName("127.0.0.6"); final String DATA_CENTER = "datacenter1"; final String RACK1 = "rack1"; final String RACK2 = "rack2"; @@ -205,19 +204,19 @@ public class TokenMetadataTest DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch() { @Override - public String getRack(InetAddress endpoint) + public String getRack(InetAddressAndPort endpoint) { return RACK1; } @Override - public String getDatacenter(InetAddress endpoint) + public String getDatacenter(InetAddressAndPort endpoint) { return DATA_CENTER; } @Override - public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; } @@ -232,14 +231,14 @@ public class TokenMetadataTest TokenMetadata.Topology topology = tokenMetadata.getTopology(); assertNotNull(topology); - Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints(); + Multimap<String, InetAddressAndPort> allEndpoints = topology.getDatacenterEndpoints(); assertNotNull(allEndpoints); assertTrue(allEndpoints.size() == 2); assertTrue(allEndpoints.containsKey(DATA_CENTER)); assertTrue(allEndpoints.get(DATA_CENTER).contains(first)); assertTrue(allEndpoints.get(DATA_CENTER).contains(second)); - Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks(); + Map<String, Multimap<String, InetAddressAndPort>> racks = topology.getDatacenterRacks(); assertNotNull(racks); assertTrue(racks.size() == 1); assertTrue(racks.containsKey(DATA_CENTER)); @@ -252,19 +251,19 @@ public class TokenMetadataTest DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch() { @Override - public String getRack(InetAddress endpoint) + public String getRack(InetAddressAndPort endpoint) { return endpoint.equals(first) ? RACK1 : RACK2; } @Override - public String getDatacenter(InetAddress endpoint) + public String getDatacenter(InetAddressAndPort endpoint) { return DATA_CENTER; } @Override - public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; } @@ -293,8 +292,8 @@ public class TokenMetadataTest @Test public void testEndpointSizes() throws UnknownHostException { - final InetAddress first = InetAddress.getByName("127.0.0.1"); - final InetAddress second = InetAddress.getByName("127.0.0.6"); + final InetAddressAndPort first = InetAddressAndPort.getByName("127.0.0.1"); + final InetAddressAndPort second = InetAddressAndPort.getByName("127.0.0.6"); tmd.updateNormalToken(token(ONE), first); tmd.updateNormalToken(token(SIX), second); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java b/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java index 2394e0c..2b13715 100644 --- a/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java +++ b/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java @@ -20,7 +20,6 @@ */ package org.apache.cassandra.metrics; -import java.net.InetAddress; import java.util.Map; import java.util.UUID; @@ -35,6 +34,7 @@ import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.hints.HintsService; +import org.apache.cassandra.locator.InetAddressAndPort; import static org.junit.Assert.assertEquals; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; @@ -53,11 +53,15 @@ public class HintedHandOffMetricsTest DatabaseDescriptor.getHintsDirectory().mkdirs(); for (int i = 0; i < 99; i++) - HintsService.instance.metrics.incrPastWindow(InetAddress.getByName("127.0.0.1")); + HintsService.instance.metrics.incrPastWindow(InetAddressAndPort.getLocalHost()); HintsService.instance.metrics.log(); - UntypedResultSet rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.PEER_EVENTS); + UntypedResultSet rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.PEER_EVENTS_V2); Map<UUID, Integer> returned = rows.one().getMap("hints_dropped", UUIDType.instance, Int32Type.instance); assertEquals(Iterators.getLast(returned.values().iterator()).intValue(), 99); + + rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.LEGACY_PEER_EVENTS); + returned = rows.one().getMap("hints_dropped", UUIDType.instance, Int32Type.instance); + assertEquals(Iterators.getLast(returned.values().iterator()).intValue(), 99); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/CompactEndpointSerializationHelperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/CompactEndpointSerializationHelperTest.java b/test/unit/org/apache/cassandra/net/CompactEndpointSerializationHelperTest.java new file mode 100644 index 0000000..3232455 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/CompactEndpointSerializationHelperTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.nio.ByteBuffer; + +import org.junit.Test; + +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.locator.InetAddressAndPort; + +import static org.junit.Assert.assertEquals; + +public class CompactEndpointSerializationHelperTest +{ + + @Test + public void testRoundtrip() throws Exception + { + InetAddressAndPort ipv4 = InetAddressAndPort.getByName("127.0.0.1:42"); + InetAddressAndPort ipv6 = InetAddressAndPort.getByName("[2001:db8:0:0:0:ff00:42:8329]:42"); + + testAddress(ipv4, MessagingService.VERSION_30); + testAddress(ipv6, MessagingService.VERSION_30); + testAddress(ipv4, MessagingService.current_version); + testAddress(ipv6, MessagingService.current_version); + } + + private void testAddress(InetAddressAndPort address, int version) throws Exception + { + ByteBuffer out; + try (DataOutputBuffer dob = new DataOutputBuffer()) + { + CompactEndpointSerializationHelper.instance.serialize(address, dob, version); + out = dob.buffer(); + } + assertEquals(out.remaining(), CompactEndpointSerializationHelper.instance.serializedSize(address, version)); + + InetAddressAndPort roundtripped; + try (DataInputBuffer dib = new DataInputBuffer(out, false)) + { + roundtripped = CompactEndpointSerializationHelper.instance.deserialize(dib, version); + } + + if (version >= MessagingService.VERSION_40) + { + assertEquals(address, roundtripped); + } + else + { + assertEquals(roundtripped.address, address.address); + assertEquals(7000, roundtripped.port); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/ForwardToContainerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/ForwardToContainerTest.java b/test/unit/org/apache/cassandra/net/ForwardToContainerTest.java new file mode 100644 index 0000000..195d734 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/ForwardToContainerTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import com.google.common.collect.ImmutableList; +import org.junit.Test; + +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.locator.InetAddressAndPort; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ForwardToContainerTest +{ + @Test + public void testCurrent() throws Exception + { + testVersion(MessagingService.current_version); + } + + @Test + public void test30() throws Exception + { + testVersion(MessagingService.VERSION_30); + } + + private void testVersion(int version) throws Exception + { + InetAddressAndPort.initializeDefaultPort(65532); + List<InetAddressAndPort> addresses = ImmutableList.of(InetAddressAndPort.getByNameOverrideDefaults("127.0.0.1", 42), + InetAddressAndPort.getByName("127.0.0.1"), + InetAddressAndPort.getByName("127.0.0.1:7000"), + InetAddressAndPort.getByNameOverrideDefaults("2001:0db8:0000:0000:0000:ff00:0042:8329", 42), + InetAddressAndPort.getByName("2001:0db8:0000:0000:0000:ff00:0042:8329"), + InetAddressAndPort.getByName("[2001:0db8:0000:0000:0000:ff00:0042:8329]:7000")); + + ForwardToContainer ftc = new ForwardToContainer(addresses, new int[] { 44, 45, 46, 47, 48, 49 }); + ByteBuffer buffer; + try (DataOutputBuffer dob = new DataOutputBuffer()) + { + ForwardToSerializer.instance.serialize(ftc, dob, version); + buffer = dob.buffer(); + } + + assertEquals(buffer.remaining(), ForwardToSerializer.instance.serializedSize(ftc, version)); + + ForwardToContainer deserialized; + try (DataInputBuffer dib = new DataInputBuffer(buffer, false)) + { + deserialized = ForwardToSerializer.instance.deserialize(dib, version); + } + + assertTrue(Arrays.equals(ftc.messageIds, deserialized.messageIds)); + + Iterator<InetAddressAndPort> iterator = deserialized.targets.iterator(); + if (version >= MessagingService.VERSION_40) + { + for (int ii = 0; ii < addresses.size(); ii++) + { + InetAddressAndPort original = addresses.get(ii); + InetAddressAndPort roundtripped = iterator.next(); + assertEquals(original, roundtripped); + } + } + else + { + for (int ii = 0; ii < addresses.size(); ii++) + { + InetAddressAndPort original = addresses.get(ii); + InetAddressAndPort roundtripped = iterator.next(); + assertEquals(original.address, roundtripped.address); + //3.0 can't send port numbers so you get the defaults + assertEquals(65532, roundtripped.port); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/Matcher.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/Matcher.java b/test/unit/org/apache/cassandra/net/Matcher.java index cd1b667..27b685f 100644 --- a/test/unit/org/apache/cassandra/net/Matcher.java +++ b/test/unit/org/apache/cassandra/net/Matcher.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.net; -import java.net.InetAddress; +import org.apache.cassandra.locator.InetAddressAndPort; /** * Predicate based on intercepted, outgoing messange and the message's destination address. @@ -28,5 +28,5 @@ public interface Matcher<T> * @param obj intercepted outgoing message * @param to destination address */ - public boolean matches(MessageOut<T> obj, InetAddress to); + public boolean matches(MessageOut<T> obj, InetAddressAndPort to); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/MatcherResponse.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MatcherResponse.java b/test/unit/org/apache/cassandra/net/MatcherResponse.java index 6cd8085..7a1772a 100644 --- a/test/unit/org/apache/cassandra/net/MatcherResponse.java +++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.net; -import java.net.InetAddress; import java.util.Collections; import java.util.HashSet; import java.util.Queue; @@ -27,6 +26,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; +import org.apache.cassandra.locator.InetAddressAndPort; + /** * Sends a response for an incoming message with a matching {@link Matcher}. * The actual behavior by any instance of this class can be inspected by @@ -76,7 +77,7 @@ public class MatcherResponse * Respond with the message created by the provided function that will be called with each intercepted outbound message. * @param fnResponse function to call for creating reply based on intercepted message and target address */ - public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse) + public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, InetAddressAndPort, MessageIn<S>> fnResponse) { return respondN(fnResponse, Integer.MAX_VALUE); } @@ -101,7 +102,7 @@ public class MatcherResponse */ public <T, S> MockMessagingSpy respondNWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb, int limit) { - return respondN((MessageOut<T> msg, InetAddress to) -> { + return respondN((MessageOut<T> msg, InetAddressAndPort to) -> { S payload = fnResponse.apply(msg); if (payload == null) return null; @@ -147,7 +148,7 @@ public class MatcherResponse * each intercepted outbound message. * @param fnResponse function to call for creating reply based on intercepted message and target address */ - public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse, int limit) + public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, InetAddressAndPort, MessageIn<S>> fnResponse, int limit) { limitCounter.set(limit); @@ -155,7 +156,7 @@ public class MatcherResponse sink = new IMessageSink() { - public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) + public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to) { // prevent outgoing message from being send in case matcher indicates a match // and instead send the mocked response http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/MessagingServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java index f0a959e..4ce3422 100644 --- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -48,6 +48,10 @@ import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.monitoring.ApproximateTime; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService.ServerChannel; import org.apache.cassandra.net.async.NettyFactory; import org.apache.cassandra.net.async.OutboundConnectionIdentifier; @@ -81,7 +85,7 @@ public class MessagingServiceTest }; private static IInternodeAuthenticator originalAuthenticator; private static ServerEncryptionOptions originalServerEncryptionOptions; - private static InetAddress originalListenAddress; + private static InetAddressAndPort originalListenAddress; private final MessagingService messagingService = MessagingService.test(); @@ -93,7 +97,7 @@ public class MessagingServiceTest DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1")); originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator(); originalServerEncryptionOptions = DatabaseDescriptor.getServerEncryptionOptions(); - originalListenAddress = DatabaseDescriptor.getListenAddress(); + originalListenAddress = InetAddressAndPort.getByAddressOverrideDefaults(DatabaseDescriptor.getListenAddress(), DatabaseDescriptor.getStoragePort()); } private static int metricScopeId = 0; @@ -103,8 +107,8 @@ public class MessagingServiceTest { messagingService.resetDroppedMessagesMap(Integer.toString(metricScopeId++)); MockBackPressureStrategy.applied = false; - messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.2")); - messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.3")); + messagingService.destroyConnectionPool(InetAddressAndPort.getByName("127.0.0.2")); + messagingService.destroyConnectionPool(InetAddressAndPort.getByName("127.0.0.3")); } @After @@ -113,7 +117,7 @@ public class MessagingServiceTest DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator); DatabaseDescriptor.setServerEncryptionOptions(originalServerEncryptionOptions); DatabaseDescriptor.setShouldListenOnBroadcastAddress(false); - DatabaseDescriptor.setListenAddress(originalListenAddress); + DatabaseDescriptor.setListenAddress(originalListenAddress.address); FBUtilities.reset(); } @@ -221,44 +225,44 @@ public class MessagingServiceTest @Test public void testUpdatesBackPressureOnSendWhenEnabledAndWithSupportedCallback() throws UnknownHostException { - MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddress.getByName("127.0.0.2")); + MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddressAndPort.getByName("127.0.0.2")); IAsyncCallback bpCallback = new BackPressureCallback(); IAsyncCallback noCallback = new NoBackPressureCallback(); MessageOut<?> ignored = null; DatabaseDescriptor.setBackPressureEnabled(true); - messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), noCallback, ignored); + messagingService.updateBackPressureOnSend(InetAddressAndPort.getByName("127.0.0.2"), noCallback, ignored); assertFalse(backPressureState.onSend); DatabaseDescriptor.setBackPressureEnabled(false); - messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), bpCallback, ignored); + messagingService.updateBackPressureOnSend(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, ignored); assertFalse(backPressureState.onSend); DatabaseDescriptor.setBackPressureEnabled(true); - messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), bpCallback, ignored); + messagingService.updateBackPressureOnSend(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, ignored); assertTrue(backPressureState.onSend); } @Test public void testUpdatesBackPressureOnReceiveWhenEnabledAndWithSupportedCallback() throws UnknownHostException { - MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddress.getByName("127.0.0.2")); + MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddressAndPort.getByName("127.0.0.2")); IAsyncCallback bpCallback = new BackPressureCallback(); IAsyncCallback noCallback = new NoBackPressureCallback(); boolean timeout = false; DatabaseDescriptor.setBackPressureEnabled(true); - messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), noCallback, timeout); + messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), noCallback, timeout); assertFalse(backPressureState.onReceive); assertFalse(backPressureState.onTimeout); DatabaseDescriptor.setBackPressureEnabled(false); - messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout); + messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout); assertFalse(backPressureState.onReceive); assertFalse(backPressureState.onTimeout); DatabaseDescriptor.setBackPressureEnabled(true); - messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout); + messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout); assertTrue(backPressureState.onReceive); assertFalse(backPressureState.onTimeout); } @@ -266,23 +270,23 @@ public class MessagingServiceTest @Test public void testUpdatesBackPressureOnTimeoutWhenEnabledAndWithSupportedCallback() throws UnknownHostException { - MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddress.getByName("127.0.0.2")); + MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddressAndPort.getByName("127.0.0.2")); IAsyncCallback bpCallback = new BackPressureCallback(); IAsyncCallback noCallback = new NoBackPressureCallback(); boolean timeout = true; DatabaseDescriptor.setBackPressureEnabled(true); - messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), noCallback, timeout); + messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), noCallback, timeout); assertFalse(backPressureState.onReceive); assertFalse(backPressureState.onTimeout); DatabaseDescriptor.setBackPressureEnabled(false); - messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout); + messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout); assertFalse(backPressureState.onReceive); assertFalse(backPressureState.onTimeout); DatabaseDescriptor.setBackPressureEnabled(true); - messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout); + messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout); assertFalse(backPressureState.onReceive); assertTrue(backPressureState.onTimeout); } @@ -291,11 +295,11 @@ public class MessagingServiceTest public void testAppliesBackPressureWhenEnabled() throws UnknownHostException { DatabaseDescriptor.setBackPressureEnabled(false); - messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.2")), ONE_SECOND); + messagingService.applyBackPressure(Arrays.asList(InetAddressAndPort.getByName("127.0.0.2")), ONE_SECOND); assertFalse(MockBackPressureStrategy.applied); DatabaseDescriptor.setBackPressureEnabled(true); - messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.2")), ONE_SECOND); + messagingService.applyBackPressure(Arrays.asList(InetAddressAndPort.getByName("127.0.0.2")), ONE_SECOND); assertTrue(MockBackPressureStrategy.applied); } @@ -303,13 +307,13 @@ public class MessagingServiceTest public void testDoesntApplyBackPressureToBroadcastAddress() throws UnknownHostException { DatabaseDescriptor.setBackPressureEnabled(true); - messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.1")), ONE_SECOND); + messagingService.applyBackPressure(Arrays.asList(InetAddressAndPort.getByName("127.0.0.1")), ONE_SECOND); assertFalse(MockBackPressureStrategy.applied); } private static void addDCLatency(long sentAt, long nowTime) throws IOException { - MessageIn.deriveConstructionTime(InetAddress.getLocalHost(), (int)sentAt, nowTime); + MessageIn.deriveConstructionTime(InetAddressAndPort.getLocalHost(), (int)sentAt, nowTime); } public static class MockBackPressureStrategy implements BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState> @@ -328,19 +332,19 @@ public class MessagingServiceTest } @Override - public MockBackPressureState newState(InetAddress host) + public MockBackPressureState newState(InetAddressAndPort host) { return new MockBackPressureState(host); } public static class MockBackPressureState implements BackPressureState { - private final InetAddress host; + private final InetAddressAndPort host; public volatile boolean onSend = false; public volatile boolean onReceive = false; public volatile boolean onTimeout = false; - private MockBackPressureState(InetAddress host) + private MockBackPressureState(InetAddressAndPort host) { this.host = host; } @@ -370,7 +374,7 @@ public class MessagingServiceTest } @Override - public InetAddress getHost() + public InetAddressAndPort getHost() { return host; } @@ -429,7 +433,7 @@ public class MessagingServiceTest { MessagingService ms = MessagingService.instance(); DatabaseDescriptor.setInternodeAuthenticator(ALLOW_NOTHING_AUTHENTICATOR); - InetAddress address = InetAddress.getByName("127.0.0.250"); + InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.250"); //Should return null MessageOut messageOut = new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK); @@ -444,20 +448,20 @@ public class MessagingServiceTest public void testOutboundMessagingConnectionCleansUp() throws Exception { MessagingService ms = MessagingService.instance(); - InetSocketAddress local = new InetSocketAddress("127.0.0.1", 9876); - InetSocketAddress remote = new InetSocketAddress("127.0.0.2", 9876); + InetAddressAndPort local = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.1", 9876); + InetAddressAndPort remote = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.2", 9876); - OutboundMessagingPool pool = new OutboundMessagingPool(remote, local, null, new MockBackPressureStrategy(null).newState(remote.getAddress()), ALLOW_NOTHING_AUTHENTICATOR); - ms.channelManagers.put(remote.getAddress(), pool); + OutboundMessagingPool pool = new OutboundMessagingPool(remote, local, null, new MockBackPressureStrategy(null).newState(remote), ALLOW_NOTHING_AUTHENTICATOR); + ms.channelManagers.put(remote, pool); pool.sendMessage(new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK), 0); - assertFalse(ms.channelManagers.containsKey(remote.getAddress())); + assertFalse(ms.channelManagers.containsKey(remote)); } @Test - public void reconnectWithNewIp() throws UnknownHostException + public void reconnectWithNewIp() throws Exception { - InetAddress publicIp = InetAddress.getByName("127.0.0.2"); - InetAddress privateIp = InetAddress.getByName("127.0.0.3"); + InetAddressAndPort publicIp = InetAddressAndPort.getByName("127.0.0.2"); + InetAddressAndPort privateIp = InetAddressAndPort.getByName("127.0.0.3"); // reset the preferred IP value, for good test hygene SystemKeyspace.updatePreferredIP(publicIp, publicIp); @@ -485,8 +489,8 @@ public class MessagingServiceTest Assert.assertEquals(0, serverChannel.size()); // now, create a connection and make sure it's in a channel group - InetSocketAddress server = new InetSocketAddress(FBUtilities.getBroadcastAddress(), DatabaseDescriptor.getStoragePort()); - OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 0), server); + InetAddressAndPort server = FBUtilities.getBroadcastAddressAndPort(); + OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(InetAddressAndPort.getByNameOverrideDefaults("127.0.0.2", 0), server); CountDownLatch latch = new CountDownLatch(1); OutboundConnectionParams params = OutboundConnectionParams.builder() @@ -596,7 +600,7 @@ public class MessagingServiceTest if (listenOnBroadcastAddr) { DatabaseDescriptor.setShouldListenOnBroadcastAddress(true); - listenAddress = InetAddresses.increment(FBUtilities.getBroadcastAddress()); + listenAddress = InetAddresses.increment(FBUtilities.getBroadcastAddressAndPort().address); DatabaseDescriptor.setListenAddress(listenAddress); FBUtilities.reset(); } @@ -627,7 +631,7 @@ public class MessagingServiceTest if (serverEncryptionOptions.enable_legacy_ssl_storage_port) { - if (legacySslPort == serverChannel.getAddress().getPort()) + if (legacySslPort == serverChannel.getAddress().port) { foundLegacyListenSslAddress = true; Assert.assertEquals(ServerChannel.SecurityLevel.REQUIRED, serverChannel.getSecurityLevel()); @@ -646,7 +650,7 @@ public class MessagingServiceTest int found = 0; for (ServerChannel serverChannel : messagingService.serverChannels) { - if (serverChannel.getAddress().getAddress().equals(listenAddress)) + if (serverChannel.getAddress().address.equals(listenAddress)) found++; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/MockMessagingService.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MockMessagingService.java b/test/unit/org/apache/cassandra/net/MockMessagingService.java index 0412759..4ea1bf5 100644 --- a/test/unit/org/apache/cassandra/net/MockMessagingService.java +++ b/test/unit/org/apache/cassandra/net/MockMessagingService.java @@ -17,14 +17,15 @@ */ package org.apache.cassandra.net; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.function.Predicate; +import org.apache.cassandra.locator.InetAddressAndPort; + /** * Starting point for mocking {@link MessagingService} interactions. Outgoing messages can be * intercepted by first creating a {@link MatcherResponse} by calling {@link MockMessagingService#when(Matcher)}. - * Alternatively {@link Matcher}s can be created by using helper methods such as {@link #to(InetAddress)}, + * Alternatively {@link Matcher}s can be created by using helper methods such as {@link #to(InetAddressAndPort)}, * {@link #verb(MessagingService.Verb)} or {@link #payload(Predicate)} and may also be * nested using {@link MockMessagingService#all(Matcher[])} or {@link MockMessagingService#any(Matcher[])}. * After each test, {@link MockMessagingService#cleanup()} must be called for free listeners registered @@ -58,11 +59,11 @@ public class MockMessagingService * Creates a matcher that will indicate if the target address of the outgoing message equals the * provided address. */ - public static Matcher<InetAddress> to(String address) + public static Matcher<InetAddressAndPort> to(String address) { try { - return to(InetAddress.getByName(address)); + return to(InetAddressAndPort.getByName(address)); } catch (UnknownHostException e) { @@ -74,7 +75,7 @@ public class MockMessagingService * Creates a matcher that will indicate if the target address of the outgoing message equals the * provided address. */ - public static Matcher<InetAddress> to(InetAddress address) + public static Matcher<InetAddressAndPort> to(InetAddressAndPort address) { return (in, to) -> to == address || to.equals(address); } @@ -117,7 +118,7 @@ public class MockMessagingService */ public static <T> Matcher<?> all(Matcher<?>... matchers) { - return (MessageOut<T> out, InetAddress to) -> { + return (MessageOut<T> out, InetAddressAndPort to) -> { for (Matcher matcher : matchers) { if (!matcher.matches(out, to)) @@ -132,7 +133,7 @@ public class MockMessagingService */ public static <T> Matcher<?> any(Matcher<?>... matchers) { - return (MessageOut<T> out, InetAddress to) -> { + return (MessageOut<T> out, InetAddressAndPort to) -> { for (Matcher matcher : matchers) { if (matcher.matches(out, to)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java index 3f6564e..8d0f91b 100644 --- a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java @@ -55,7 +55,7 @@ public class MockMessagingServiceTest public void testRequestResponse() throws InterruptedException, ExecutionException { // echo message that we like to mock as incoming reply for outgoing echo message - MessageIn<EchoMessage> echoMessageIn = MessageIn.create(FBUtilities.getBroadcastAddress(), + MessageIn<EchoMessage> echoMessageIn = MessageIn.create(FBUtilities.getBroadcastAddressAndPort(), EchoMessage.instance, Collections.emptyMap(), MessagingService.Verb.ECHO, @@ -63,14 +63,14 @@ public class MockMessagingServiceTest MockMessagingSpy spy = MockMessagingService .when( all( - to(FBUtilities.getBroadcastAddress()), + to(FBUtilities.getBroadcastAddressAndPort()), verb(MessagingService.Verb.ECHO) ) ) .respond(echoMessageIn); MessageOut<EchoMessage> echoMessageOut = new MessageOut<>(MessagingService.Verb.ECHO, EchoMessage.instance, EchoMessage.serializer); - MessagingService.instance().sendRR(echoMessageOut, FBUtilities.getBroadcastAddress(), new IAsyncCallback() + MessagingService.instance().sendRR(echoMessageOut, FBUtilities.getBroadcastAddressAndPort(), new IAsyncCallback() { public void response(MessageIn msg) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java index 93fe32e..4d1ae01 100644 --- a/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java +++ b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java @@ -18,7 +18,6 @@ */ package org.apache.cassandra.net; -import java.net.InetAddress; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -29,6 +28,7 @@ import com.google.common.util.concurrent.RateLimiter; import org.junit.Test; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.utils.TestTimeSource; import org.apache.cassandra.utils.TimeSource; @@ -94,17 +94,17 @@ public class RateBasedBackPressureTest TestTimeSource timeSource = new TestTimeSource(); RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize); - RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress()); + RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress()); state.onMessageSent(null); assertEquals(0, state.incomingRate.size()); assertEquals(0, state.outgoingRate.size()); - state = strategy.newState(InetAddress.getLoopbackAddress()); + state = strategy.newState(InetAddressAndPort.getLoopbackAddress()); state.onResponseReceived(); assertEquals(1, state.incomingRate.size()); assertEquals(1, state.outgoingRate.size()); - state = strategy.newState(InetAddress.getLoopbackAddress()); + state = strategy.newState(InetAddressAndPort.getLoopbackAddress()); state.onResponseTimeout(); assertEquals(0, state.incomingRate.size()); assertEquals(1, state.outgoingRate.size()); @@ -116,7 +116,7 @@ public class RateBasedBackPressureTest long windowSize = 6000; TestTimeSource timeSource = new TestTimeSource(); RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize); - RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress()); + RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress()); // Get initial rate: double initialRate = state.rateLimiter.getRate(); @@ -140,7 +140,7 @@ public class RateBasedBackPressureTest long windowSize = 6000; TestTimeSource timeSource = new TestTimeSource(); RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize); - RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress()); + RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress()); // Get initial time: long current = state.getLastIntervalAcquire(); @@ -174,7 +174,7 @@ public class RateBasedBackPressureTest long windowSize = 6000; TestTimeSource timeSource = new TestTimeSource(); RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize); - RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress()); + RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress()); // Update incoming and outgoing rate so that the ratio is 0.5: state.incomingRate.update(50); @@ -194,7 +194,7 @@ public class RateBasedBackPressureTest long windowSize = 6000; TestTimeSource timeSource = new TestTimeSource(); RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize); - RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress()); + RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress()); // Update incoming and outgoing rate so that the ratio is 0.5: state.incomingRate.update(50); @@ -236,9 +236,9 @@ public class RateBasedBackPressureTest long windowSize = 6000; TestTimeSource timeSource = new TestTimeSource(); TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize); - RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1")); - RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2")); - RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3")); + RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1")); + RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2")); + RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3")); // Update incoming and outgoing rates: state1.incomingRate.update(50); @@ -265,9 +265,9 @@ public class RateBasedBackPressureTest long windowSize = 6000; TestTimeSource timeSource = new TestTimeSource(); TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize); - RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1")); - RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2")); - RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3")); + RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1")); + RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2")); + RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3")); // Update incoming and outgoing rates: state1.incomingRate.update(50); @@ -294,10 +294,10 @@ public class RateBasedBackPressureTest long windowSize = 6000; TestTimeSource timeSource = new TestTimeSource(); TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize); - RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1")); - RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2")); - RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3")); - RateBasedBackPressureState state4 = strategy.newState(InetAddress.getByName("127.0.0.4")); + RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1")); + RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2")); + RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3")); + RateBasedBackPressureState state4 = strategy.newState(InetAddressAndPort.getByName("127.0.0.4")); // Update incoming and outgoing rates: state1.incomingRate.update(50); // this @@ -333,9 +333,9 @@ public class RateBasedBackPressureTest long windowSize = 10000; TestTimeSource timeSource = new TestTimeSource(); TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize); - RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1")); - RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2")); - RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3")); + RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1")); + RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2")); + RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3")); // Update incoming and outgoing rates: state1.incomingRate.update(5); // slow http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java index 2d12baf..c8469a8 100644 --- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java +++ b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java @@ -18,7 +18,6 @@ */ package org.apache.cassandra.net; -import java.net.InetAddress; import java.util.UUID; import org.junit.BeforeClass; @@ -32,6 +31,7 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService.Verb; import org.apache.cassandra.schema.MockSchema; import org.apache.cassandra.schema.TableMetadata; @@ -65,7 +65,7 @@ public class WriteCallbackInfoTest ? new Commit(UUID.randomUUID(), new PartitionUpdate.Builder(metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, RegularAndStaticColumns.NONE, 1).build()) : new Mutation(PartitionUpdate.simpleBuilder(metadata, "").build()); - WriteCallbackInfo wcbi = new WriteCallbackInfo(InetAddress.getByName("192.168.1.1"), null, new MessageOut(verb, payload, null), null, cl, allowHints); + WriteCallbackInfo wcbi = new WriteCallbackInfo(InetAddressAndPort.getByName("192.168.1.1"), null, new MessageOut(verb, payload, null), null, cl, allowHints); Assert.assertEquals(expectHint, wcbi.shouldHint()); if (expectHint) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java b/test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java index 128fe4b..0211512 100644 --- a/test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java +++ b/test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java @@ -24,6 +24,7 @@ import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import com.google.common.net.InetAddresses; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -37,6 +38,7 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.embedded.EmbeddedChannel; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.async.ChannelWriter.CoalescingChannelWriter; @@ -68,8 +70,8 @@ public class ChannelWriterTest @Before public void setup() { - OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(new InetSocketAddress("127.0.0.1", 0), - new InetSocketAddress("127.0.0.2", 0)); + OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0), + InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0)); channel = new EmbeddedChannel(); omc = new NonSendingOutboundMessagingConnection(id, null, Optional.empty()); channelWriter = ChannelWriter.create(channel, omc::handleMessageResult, Optional.empty()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java b/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java index 100e1e0..f92ce5a 100644 --- a/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java +++ b/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java @@ -23,6 +23,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Optional; +import com.google.common.net.InetAddresses; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -38,6 +39,7 @@ import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; @@ -50,8 +52,8 @@ public class HandshakeHandlersTest private static final String KEYSPACE1 = "NettyPipilineTest"; private static final String STANDARD1 = "Standard1"; - private static final InetSocketAddress LOCAL_ADDR = new InetSocketAddress("127.0.0.1", 9999); - private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 9999); + private static final InetAddressAndPort LOCAL_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 9999); + private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 9999); private static final int MESSAGING_VERSION = MessagingService.current_version; private static final OutboundConnectionIdentifier connectionId = OutboundConnectionIdentifier.small(LOCAL_ADDR, REMOTE_ADDR); @@ -179,7 +181,7 @@ public class HandshakeHandlersTest InboundHandshakeHandler handler = new InboundHandshakeHandler(new TestAuthenticator(true)); EmbeddedChannel inboundChannel = new EmbeddedChannel(handler); - handler.setupMessagingPipeline(inboundChannel.pipeline(), REMOTE_ADDR.getAddress(), compress, MESSAGING_VERSION); + handler.setupMessagingPipeline(inboundChannel.pipeline(), REMOTE_ADDR, compress, MESSAGING_VERSION); return new TestChannels(outboundChannel, inboundChannel); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/HandshakeProtocolTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/async/HandshakeProtocolTest.java b/test/unit/org/apache/cassandra/net/async/HandshakeProtocolTest.java index a3d646d..af48636 100644 --- a/test/unit/org/apache/cassandra/net/async/HandshakeProtocolTest.java +++ b/test/unit/org/apache/cassandra/net/async/HandshakeProtocolTest.java @@ -85,7 +85,7 @@ public class HandshakeProtocolTest @Test public void thirdMessageTest() throws Exception { - ThirdHandshakeMessage before = new ThirdHandshakeMessage(MessagingService.current_version, FBUtilities.getBroadcastAddress()); + ThirdHandshakeMessage before = new ThirdHandshakeMessage(MessagingService.current_version, FBUtilities.getBroadcastAddressAndPort()); buf = before.encode(PooledByteBufAllocator.DEFAULT); ThirdHandshakeMessage after = ThirdHandshakeMessage.maybeDecode(buf); assertEquals(before, after); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
