http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java index 84a4c32..52fc6ac 100644 --- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java +++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java @@ -19,10 +19,14 @@ package org.apache.cassandra.utils; import java.net.InetAddress; import java.nio.ByteBuffer; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.*; +import java.util.stream.Collectors; import com.datastax.driver.core.*; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.ColumnMetadata.ClusteringOrder; @@ -34,33 +38,46 @@ import org.apache.cassandra.dht.Token.TokenFactory; import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.CQLTypeParser; +import org.apache.cassandra.schema.SchemaKeyspace; +import org.apache.cassandra.schema.Types; + public class NativeSSTableLoaderClient extends SSTableLoader.Client { protected final Map<String, TableMetadataRef> tables; - private final Collection<InetAddress> hosts; + private final Collection<InetSocketAddress> hosts; private final int port; + private final int storagePort; private final AuthProvider authProvider; private final SSLOptions sslOptions; + private final boolean allowServerPortDiscovery; - public NativeSSTableLoaderClient(Collection<InetAddress> hosts, int port, String username, String password, SSLOptions sslOptions) + public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int nativePort, int storagePort, String username, String password, SSLOptions sslOptions, boolean allowServerPortDiscovery) { - this(hosts, port, new PlainTextAuthProvider(username, password), sslOptions); + this(hosts, nativePort, storagePort, new PlainTextAuthProvider(username, password), sslOptions, allowServerPortDiscovery); } - public NativeSSTableLoaderClient(Collection<InetAddress> hosts, int port, AuthProvider authProvider, SSLOptions sslOptions) + public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int nativePort, int storagePort, AuthProvider authProvider, SSLOptions sslOptions, boolean allowServerPortDiscovery) { super(); this.tables = new HashMap<>(); this.hosts = hosts; - this.port = port; + this.port = nativePort; this.authProvider = authProvider; this.sslOptions = sslOptions; + this.allowServerPortDiscovery = allowServerPortDiscovery; + this.storagePort = storagePort; } public void init(String keyspace) { - Cluster.Builder builder = Cluster.builder().addContactPoints(hosts).withPort(port); + Set<InetAddress> hostAddresses = hosts.stream().map(host -> host.getAddress()).collect(Collectors.toSet()); + Cluster.Builder builder = Cluster.builder().addContactPoints(hostAddresses).withPort(port).allowBetaProtocolVersion(); + + if (allowServerPortDiscovery) + builder = builder.allowServerPortDiscovery(); + if (sslOptions != null) builder.withSSL(sslOptions); if (authProvider != null) @@ -82,7 +99,18 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client Range<Token> range = new Range<>(tokenFactory.fromString(tokenRange.getStart().getValue().toString()), tokenFactory.fromString(tokenRange.getEnd().getValue().toString())); for (Host endpoint : endpoints) - addRangeForEndpoint(range, endpoint.getAddress()); + { + int portToUse; + if (allowServerPortDiscovery) + { + portToUse = endpoint.getBroadcastAddressOptPort().portOrElse(storagePort); + } + else + { + portToUse = storagePort; + } + addRangeForEndpoint(range, InetAddressAndPort.getByNameOverrideDefaults(endpoint.getAddress().getHostAddress(), portToUse)); + } } Types types = fetchTypes(keyspace, session); @@ -91,6 +119,10 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client // We only need the TableMetadata for the views, so we only load that. tables.putAll(fetchViews(keyspace, session, partitioner, types)); } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } } public TableMetadataRef getTableMetadata(String tableName)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/utils/UUIDGen.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java index 94b5832..19a0f83 100644 --- a/src/java/org/apache/cassandra/utils/UUIDGen.java +++ b/src/java/org/apache/cassandra/utils/UUIDGen.java @@ -356,7 +356,7 @@ public class UUIDGen * The spec says that one option is to take as many source that identify * this node as possible and hash them together. That's what we do here by * gathering all the ip of this host. - * Note that FBUtilities.getBroadcastAddress() should be enough to uniquely + * Note that FBUtilities.getJustBroadcastAddress() should be enough to uniquely * identify the node *in the cluster* but it triggers DatabaseDescriptor * instanciation and the UUID generator is used in Stress for instance, * where we don't want to require the yaml. http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index ead2a88..640f9b3 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -17,6 +17,7 @@ hints_directory: build/test/cassandra/hints partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner listen_address: 127.0.0.1 storage_port: 7010 +ssl_storage_port: 7011 start_native_transport: true native_transport_port: 9042 column_index_size_in_kb: 4 @@ -27,7 +28,7 @@ disk_access_mode: mmap seed_provider: - class_name: org.apache.cassandra.locator.SimpleSeedProvider parameters: - - seeds: "127.0.0.1" + - seeds: "127.0.0.1:7010" endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch dynamic_snitch: true server_encryption_options: http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/gms.EndpointState.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/4.0/gms.EndpointState.bin b/test/data/serialization/4.0/gms.EndpointState.bin index fb7d168..17fc088 100644 Binary files a/test/data/serialization/4.0/gms.EndpointState.bin and b/test/data/serialization/4.0/gms.EndpointState.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/gms.Gossip.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/4.0/gms.Gossip.bin b/test/data/serialization/4.0/gms.Gossip.bin index af5ac57..2fbd5d4 100644 Binary files a/test/data/serialization/4.0/gms.Gossip.bin and b/test/data/serialization/4.0/gms.Gossip.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/service.SyncComplete.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/4.0/service.SyncComplete.bin b/test/data/serialization/4.0/service.SyncComplete.bin index ba84349..15cccb8 100644 Binary files a/test/data/serialization/4.0/service.SyncComplete.bin and b/test/data/serialization/4.0/service.SyncComplete.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/service.SyncRequest.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/4.0/service.SyncRequest.bin b/test/data/serialization/4.0/service.SyncRequest.bin index 6d688a4..f4eb532 100644 Binary files a/test/data/serialization/4.0/service.SyncRequest.bin and b/test/data/serialization/4.0/service.SyncRequest.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/service.ValidationComplete.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/4.0/service.ValidationComplete.bin b/test/data/serialization/4.0/service.ValidationComplete.bin index 7433d64..edc90b3 100644 Binary files a/test/data/serialization/4.0/service.ValidationComplete.bin and b/test/data/serialization/4.0/service.ValidationComplete.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/service.ValidationRequest.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/4.0/service.ValidationRequest.bin b/test/data/serialization/4.0/service.ValidationRequest.bin index a00763b..e45eb70 100644 Binary files a/test/data/serialization/4.0/service.ValidationRequest.bin and b/test/data/serialization/4.0/service.ValidationRequest.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java b/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java index 35bf5b4..a5025a3 100644 --- a/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java +++ b/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java @@ -20,7 +20,6 @@ package org.apache.cassandra.locator; import java.io.IOException; -import java.net.InetAddress; import java.util.*; import org.junit.Test; @@ -53,19 +52,19 @@ public class DynamicEndpointSnitchLongTest StorageService.instance.unsafeInitialize(); SimpleSnitch ss = new SimpleSnitch(); DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode())); - InetAddress self = FBUtilities.getBroadcastAddress(); + InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); - List<InetAddress> hosts = new ArrayList<>(); + List<InetAddressAndPort> hosts = new ArrayList<>(); // We want a big list of hosts so sorting takes time, making it much more likely to reproduce the // problem we're looking for. for (int i = 0; i < 100; i++) for (int j = 0; j < 256; j++) - hosts.add(InetAddress.getByAddress(new byte[]{127, 0, (byte)i, (byte)j})); + hosts.add(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, (byte)i, (byte)j})); ScoreUpdater updater = new ScoreUpdater(dsnitch, hosts); updater.start(); - List<InetAddress> result = null; + List<InetAddressAndPort> result = null; for (int i = 0; i < ITERATIONS; i++) result = dsnitch.getSortedListByProximity(self, hosts); @@ -85,10 +84,10 @@ public class DynamicEndpointSnitchLongTest public volatile boolean stopped; private final DynamicEndpointSnitch dsnitch; - private final List<InetAddress> hosts; + private final List<InetAddressAndPort> hosts; private final Random random = new Random(); - public ScoreUpdater(DynamicEndpointSnitch dsnitch, List<InetAddress> hosts) + public ScoreUpdater(DynamicEndpointSnitch dsnitch, List<InetAddressAndPort> hosts) { this.dsnitch = dsnitch; this.hosts = hosts; @@ -98,7 +97,7 @@ public class DynamicEndpointSnitchLongTest { while (!stopped) { - InetAddress host = hosts.get(random.nextInt(hosts.size())); + InetAddressAndPort host = hosts.get(random.nextInt(hosts.size())); int score = random.nextInt(SCORE_RANGE); dsnitch.receiveTiming(host, score); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/long/org/apache/cassandra/streaming/LongStreamingTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java index 799ac77..bd7ef20 100644 --- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java +++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java @@ -122,7 +122,7 @@ public class LongStreamingTest public void init(String keyspace) { for (Range<Token> range : StorageService.instance.getLocalRanges(KS)) - addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); + addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort()); this.ks = keyspace; } @@ -149,7 +149,7 @@ public class LongStreamingTest public void init(String keyspace) { for (Range<Token> range : StorageService.instance.getLocalRanges(KS)) - addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); + addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort()); this.ks = keyspace; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java index 9ec1aa6..68cfd7e 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java +++ b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java @@ -25,11 +25,11 @@ import com.google.common.collect.Multimap; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.PendingRangeMaps; import org.openjdk.jmh.annotations.*; import org.openjdk.jmh.infra.Blackhole; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collection; import java.util.HashSet; @@ -50,7 +50,7 @@ public class PendingRangesBench PendingRangeMaps pendingRangeMaps; int maxToken = 256 * 100; - Multimap<Range<Token>, InetAddress> oldPendingRanges; + Multimap<Range<Token>, InetAddressAndPort> oldPendingRanges; private Range<Token> genRange(String left, String right) { @@ -63,7 +63,7 @@ public class PendingRangesBench pendingRangeMaps = new PendingRangeMaps(); oldPendingRanges = HashMultimap.create(); - InetAddress[] addresses = {InetAddress.getByName("127.0.0.1"), InetAddress.getByName("127.0.0.2")}; + InetAddressAndPort[] addresses = { InetAddressAndPort.getByName("127.0.0.1"), InetAddressAndPort.getByName("127.0.0.2")}; for (int i = 0; i < maxToken; i++) { @@ -97,8 +97,8 @@ public class PendingRangesBench { int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5); Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken)); - Set<InetAddress> endpoints = new HashSet<>(); - for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : oldPendingRanges.asMap().entrySet()) + Set<InetAddressAndPort> endpoints = new HashSet<>(); + for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> entry : oldPendingRanges.asMap().entrySet()) { if (entry.getKey().contains(searchToken)) endpoints.addAll(entry.getValue()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java index 0e4a3cf..d06caba 100644 --- a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java +++ b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java @@ -20,8 +20,15 @@ package org.apache.cassandra; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.YamlConfigurationLoader; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.locator.InetAddressAndPort; import java.io.File; +import java.net.Inet6Address; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.stream.Collectors; + +import com.google.common.base.Joiner; public class OffsetAwareConfigurationLoader extends YamlConfigurationLoader @@ -52,6 +59,31 @@ public class OffsetAwareConfigurationLoader extends YamlConfigurationLoader config.native_transport_port += offset; config.storage_port += offset; + //Rewrite the seed ports string + String[] hosts = config.seed_provider.parameters.get("seeds").split(",", -1); + String rewrittenSeeds = Joiner.on(", ").join(Arrays.stream(hosts).map(host -> { + StringBuilder sb = new StringBuilder(); + try + { + InetAddressAndPort address = InetAddressAndPort.getByName(host.trim()); + if (address.address instanceof Inet6Address) + { + sb.append('[').append(address.address.getHostAddress()).append(']'); + } + else + { + sb.append(address.address.getHostAddress()); + } + sb.append(':').append(address.port + offset); + return sb.toString(); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Error in OffsetAwareConfigurationLoader reworking seed list", e); + } + }).collect(Collectors.toList())); + config.seed_provider.parameters.put("seeds", rewrittenSeeds); + config.commitlog_directory += sep + offset; config.saved_caches_directory += sep + offset; config.hints_directory += sep + offset; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 7e62c41..1201efa 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -22,7 +22,6 @@ package org.apache.cassandra; import java.io.Closeable; import java.io.EOFException; import java.io.IOError; -import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; @@ -39,6 +38,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; @@ -192,7 +192,7 @@ public class Util * Creates initial set of nodes and tokens. Nodes are added to StorageService as 'normal' */ public static void createInitialRing(StorageService ss, IPartitioner partitioner, List<Token> endpointTokens, - List<Token> keyTokens, List<InetAddress> hosts, List<UUID> hostIds, int howMany) + List<Token> keyTokens, List<InetAddressAndPort> hosts, List<UUID> hostIds, int howMany) throws UnknownHostException { // Expand pool of host IDs as necessary @@ -210,10 +210,13 @@ public class Util for (int i=0; i<endpointTokens.size(); i++) { - InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1)); + InetAddressAndPort ep = InetAddressAndPort.getByName("127.0.0." + String.valueOf(i + 1)); Gossiper.instance.initializeNodeUnsafe(ep, hostIds.get(i), 1); Gossiper.instance.injectApplicationState(ep, ApplicationState.TOKENS, new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(endpointTokens.get(i)))); ss.onChange(ep, + ApplicationState.STATUS_WITH_PORT, + new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(endpointTokens.get(i)))); + ss.onChange(ep, ApplicationState.STATUS, new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(endpointTokens.get(i)))); hosts.add(ep); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java index 7db1cfa..41564d9 100644 --- a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java +++ b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java @@ -17,11 +17,9 @@ */ package org.apache.cassandra.batchlog; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import com.google.common.collect.ImmutableMultimap; @@ -29,6 +27,8 @@ import com.google.common.collect.Multimap; import org.junit.Test; import org.junit.matchers.JUnitMatchers; +import org.apache.cassandra.locator.InetAddressAndPort; + import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; @@ -39,89 +39,89 @@ public class BatchlogEndpointFilterTest @Test public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException { - Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder() - .put(LOCAL, InetAddress.getByName("0")) - .put(LOCAL, InetAddress.getByName("00")) - .put("1", InetAddress.getByName("1")) - .put("1", InetAddress.getByName("11")) - .put("2", InetAddress.getByName("2")) - .put("2", InetAddress.getByName("22")) + Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder() + .put(LOCAL, InetAddressAndPort.getByName("0")) + .put(LOCAL, InetAddressAndPort.getByName("00")) + .put("1", InetAddressAndPort.getByName("1")) + .put("1", InetAddressAndPort.getByName("11")) + .put("2", InetAddressAndPort.getByName("2")) + .put("2", InetAddressAndPort.getByName("22")) .build(); - Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter(); + Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter(); assertThat(result.size(), is(2)); - assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11"))); - assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22"))); + assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("11"))); + assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("22"))); } @Test public void shouldSelectHostFromLocal() throws UnknownHostException { - Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder() - .put(LOCAL, InetAddress.getByName("0")) - .put(LOCAL, InetAddress.getByName("00")) - .put("1", InetAddress.getByName("1")) + Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder() + .put(LOCAL, InetAddressAndPort.getByName("0")) + .put(LOCAL, InetAddressAndPort.getByName("00")) + .put("1", InetAddressAndPort.getByName("1")) .build(); - Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter(); + Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter(); assertThat(result.size(), is(2)); - assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1"))); - assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0"))); + assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("1"))); + assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("0"))); } @Test public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException { - Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder() - .put(LOCAL, InetAddress.getByName("0")) + Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder() + .put(LOCAL, InetAddressAndPort.getByName("0")) .build(); - Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter(); + Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter(); assertThat(result.size(), is(1)); - assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0"))); + assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("0"))); } @Test public void shouldSelectTwoRandomHostsFromSingleOtherRack() throws UnknownHostException { - Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder() - .put(LOCAL, InetAddress.getByName("0")) - .put(LOCAL, InetAddress.getByName("00")) - .put("1", InetAddress.getByName("1")) - .put("1", InetAddress.getByName("11")) - .put("1", InetAddress.getByName("111")) + Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder() + .put(LOCAL, InetAddressAndPort.getByName("0")) + .put(LOCAL, InetAddressAndPort.getByName("00")) + .put("1", InetAddressAndPort.getByName("1")) + .put("1", InetAddressAndPort.getByName("11")) + .put("1", InetAddressAndPort.getByName("111")) .build(); - Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter(); + Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter(); // result should be the last two non-local replicas // (Collections.shuffle has been replaced with Collections.reverse for testing) assertThat(result.size(), is(2)); - assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11"))); - assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("111"))); + assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("11"))); + assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("111"))); } @Test public void shouldSelectTwoRandomHostsFromSingleRack() throws UnknownHostException { - Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder() - .put(LOCAL, InetAddress.getByName("1")) - .put(LOCAL, InetAddress.getByName("11")) - .put(LOCAL, InetAddress.getByName("111")) - .put(LOCAL, InetAddress.getByName("1111")) + Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder() + .put(LOCAL, InetAddressAndPort.getByName("1")) + .put(LOCAL, InetAddressAndPort.getByName("11")) + .put(LOCAL, InetAddressAndPort.getByName("111")) + .put(LOCAL, InetAddressAndPort.getByName("1111")) .build(); - Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter(); + Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter(); // result should be the last two non-local replicas // (Collections.shuffle has been replaced with Collections.reverse for testing) assertThat(result.size(), is(2)); - assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("111"))); - assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1111"))); + assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("111"))); + assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("1111"))); } private static class TestEndpointFilter extends BatchlogManager.EndpointFilter { - TestEndpointFilter(String localRack, Multimap<String, InetAddress> endpoints) + TestEndpointFilter(String localRack, Multimap<String, InetAddressAndPort> endpoints) { super(localRack, endpoints); } @Override - protected boolean isValid(InetAddress input) + protected boolean isValid(InetAddressAndPort input) { // We will use always alive non-localhost endpoints return true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java index 34902fe..33fb209 100644 --- a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java +++ b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.batchlog; import java.io.IOException; -import java.net.InetAddress; import java.util.*; import java.util.concurrent.ExecutionException; @@ -29,6 +28,7 @@ import org.junit.*; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.Util.PartitionerSwitcher; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.schema.Schema; @@ -94,7 +94,7 @@ public class BatchlogManagerTest public void setUp() throws Exception { TokenMetadata metadata = StorageService.instance.getTokenMetadata(); - InetAddress localhost = InetAddress.getByName("127.0.0.1"); + InetAddressAndPort localhost = InetAddressAndPort.getByName("127.0.0.1"); metadata.updateNormalToken(Util.token("A"), localhost); metadata.updateHostId(UUIDGen.getTimeUUID(), localhost); Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).truncateBlocking(); @@ -344,7 +344,7 @@ public class BatchlogManagerTest @Test public void testReplayWithNoPeers() throws Exception { - StorageService.instance.getTokenMetadata().removeEndpoint(InetAddress.getByName("127.0.0.1")); + StorageService.instance.getTokenMetadata().removeEndpoint(InetAddressAndPort.getByName("127.0.0.1")); long initialAllBatches = BatchlogManager.instance.countAllBatches(); long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java index b50a050..589afd5 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java @@ -77,6 +77,7 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.config.EncryptionOptions$ClientEncryptionOptions", "org.apache.cassandra.config.EncryptionOptions$ServerEncryptionOptions", "org.apache.cassandra.config.EncryptionOptions$ServerEncryptionOptions$InternodeEncryption", + "org.apache.cassandra.config.EncryptionOptions$ServerEncryptionOptions$OutgoingEncryptedPortSource", "org.apache.cassandra.config.YamlConfigurationLoader", "org.apache.cassandra.config.YamlConfigurationLoader$PropertiesChecker", "org.apache.cassandra.config.YamlConfigurationLoader$PropertiesChecker$1", @@ -126,6 +127,7 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.config.EncryptionOptions$ServerEncryptionOptionsCustomizer", "org.apache.cassandra.ConsoleAppenderBeanInfo", "org.apache.cassandra.ConsoleAppenderCustomizer", + "org.apache.cassandra.locator.InetAddressAndPort" }; static final Set<String> checkedClasses = new HashSet<>(Arrays.asList(validClasses)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 6993a65..69d2fb5 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -48,6 +48,7 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.index.SecondaryIndexManager; import org.apache.cassandra.config.EncryptionOptions; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.*; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.functions.FunctionName; @@ -144,9 +145,9 @@ public abstract class CQLTester // Register an EndpointSnitch which returns fixed values for test. DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch() { - @Override public String getRack(InetAddress endpoint) { return RACK1; } - @Override public String getDatacenter(InetAddress endpoint) { return DATA_CENTER; } - @Override public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) { return 0; } + @Override public String getRack(InetAddressAndPort endpoint) { return RACK1; } + @Override public String getDatacenter(InetAddressAndPort endpoint) { return DATA_CENTER; } + @Override public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; } }); try @@ -829,6 +830,11 @@ public abstract class CQLTester return sessionNet(protocolVersion).execute(formatQuery(query), values); } + protected com.datastax.driver.core.ResultSet executeNetWithPaging(ProtocolVersion version, String query, int pageSize) throws Throwable + { + return sessionNet(version).execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize)); + } + protected com.datastax.driver.core.ResultSet executeNetWithPaging(String query, int pageSize) throws Throwable { return sessionNet().execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java index 0a314da..ce5de62 100644 --- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java +++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java @@ -223,7 +223,6 @@ public class PreparedStatementsTest extends CQLTester .withClusterName("Test Cluster") .withPort(nativePort) .withoutJMXReporting() - .allowBetaProtocolVersion() .build()) { try (Session newSession = newCluster.connect()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java b/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java index 26b9d65..665bc44 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java @@ -851,10 +851,10 @@ public class ViewComplexTest extends CQLTester for (String view : Arrays.asList("mv1", "mv2")) { // paging - assertEquals(1, executeNetWithPaging(String.format("SELECT k,a,b FROM %s limit 1", view), 1).all().size()); - assertEquals(2, executeNetWithPaging(String.format("SELECT k,a,b FROM %s limit 2", view), 1).all().size()); - assertEquals(2, executeNetWithPaging(String.format("SELECT k,a,b FROM %s", view), 1).all().size()); - assertRowsNet(executeNetWithPaging(String.format("SELECT k,a,b FROM %s ", view), 1), + assertEquals(1, executeNetWithPaging(protocolVersion, String.format("SELECT k,a,b FROM %s limit 1", view), 1).all().size()); + assertEquals(2, executeNetWithPaging(protocolVersion, String.format("SELECT k,a,b FROM %s limit 2", view), 1).all().size()); + assertEquals(2, executeNetWithPaging(protocolVersion, String.format("SELECT k,a,b FROM %s", view), 1).all().size()); + assertRowsNet(protocolVersion, executeNetWithPaging(protocolVersion, String.format("SELECT k,a,b FROM %s ", view), 1), row(50, 50, 50), row(100, 100, 100)); // limit http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java index ac261ca..dc90b4e 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.cql3.validation.operations; -import java.net.InetAddress; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; @@ -26,6 +25,7 @@ import java.util.UUID; import org.junit.Test; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.schema.Schema; @@ -514,13 +514,13 @@ public class CreateTest extends CQLTester DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch() { @Override - public String getRack(InetAddress endpoint) { return RACK1; } + public String getRack(InetAddressAndPort endpoint) { return RACK1; } @Override - public String getDatacenter(InetAddress endpoint) { return "us-east-1"; } + public String getDatacenter(InetAddressAndPort endpoint) { return "us-east-1"; } @Override - public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) { return 0; } + public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; } }); execute("CREATE KEYSPACE Foo WITH replication = { 'class' : 'NetworkTopologyStrategy', 'us-east-1' : 1 };"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/CleanupTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java index 044a49e..a096c78 100644 --- a/test/unit/org/apache/cassandra/db/CleanupTest.java +++ b/test/unit/org/apache/cassandra/db/CleanupTest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.db; import java.io.IOException; -import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.AbstractMap; @@ -37,6 +36,7 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.KeyspaceMetadata; @@ -89,13 +89,13 @@ public class CleanupTest DatabaseDescriptor.setEndpointSnitch(new AbstractNetworkTopologySnitch() { @Override - public String getRack(InetAddress endpoint) + public String getRack(InetAddressAndPort endpoint) { return "RC1"; } @Override - public String getDatacenter(InetAddress endpoint) + public String getDatacenter(InetAddressAndPort endpoint) { return "DC1"; } @@ -166,8 +166,8 @@ public class CleanupTest byte[] tk1 = new byte[1], tk2 = new byte[1]; tk1[0] = 2; tk2[0] = 1; - tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1")); - tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2")); + tmd.updateNormalToken(new BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1")); + tmd.updateNormalToken(new BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2")); CompactionManager.instance.performCleanup(cfs, 2); @@ -198,8 +198,8 @@ public class CleanupTest byte[] tk1 = new byte[1], tk2 = new byte[1]; tk1[0] = 2; tk2[0] = 1; - tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1")); - tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2")); + tmd.updateNormalToken(new BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1")); + tmd.updateNormalToken(new BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2")); CompactionManager.instance.performCleanup(cfs, 2); assertEquals(0, Util.getAll(Util.cmd(cfs).build()).size()); @@ -222,9 +222,9 @@ public class CleanupTest TokenMetadata tmd = StorageService.instance.getTokenMetadata(); tmd.clearUnsafe(); - tmd.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.0.0.1")); + tmd.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.0.0.1")); byte[] tk1 = {2}; - tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1")); + tmd.updateNormalToken(new BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1")); Keyspace keyspace = Keyspace.open(KEYSPACE2); @@ -270,8 +270,8 @@ public class CleanupTest byte[] tk1 = new byte[1], tk2 = new byte[1]; tk1[0] = 2; tk2[0] = 1; - tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1")); - tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2")); + tmd.updateNormalToken(new BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1")); + tmd.updateNormalToken(new BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2")); for(SSTableReader r: cfs.getLiveSSTables()) CompactionManager.instance.forceUserDefinedCleanup(r.getFilename()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java index c56368f..6e2a714 100644 --- a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java +++ b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.db; import java.io.File; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; @@ -30,6 +29,7 @@ import org.junit.Test; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.dht.BootStrapper; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; @@ -51,7 +51,7 @@ public class DiskBoundaryManagerTest extends CQLTester { BlacklistedDirectories.clearUnwritableUnsafe(); TokenMetadata metadata = StorageService.instance.getTokenMetadata(); - metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddress()); + metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddressAndPort()); createTable("create table %s (id int primary key, x text)"); dirs = new Directories(getCurrentColumnFamilyStore().metadata(), Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")), new Directories.DataDirectory(new File("/tmp/2")), @@ -86,7 +86,7 @@ public class DiskBoundaryManagerTest extends CQLTester public void updateTokensTest() throws UnknownHostException { DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock); - StorageService.instance.getTokenMetadata().updateNormalTokens(BootStrapper.getRandomTokens(StorageService.instance.getTokenMetadata(), 10), InetAddress.getByName("127.0.0.10")); + StorageService.instance.getTokenMetadata().updateNormalTokens(BootStrapper.getRandomTokens(StorageService.instance.getTokenMetadata(), 10), InetAddressAndPort.getByName("127.0.0.10")); DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock); assertFalse(dbv1.equals(dbv2)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/ReadCommandTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java index 64ac627..6bb0a1a 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java @@ -50,9 +50,11 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -312,7 +314,8 @@ public class ReadCommandTest } } - public void serializerTest() throws IOException + @Test + public void testSerializer() throws IOException { ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2); @@ -324,10 +327,11 @@ public class ReadCommandTest ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("dd").build(); int messagingVersion = MessagingService.current_version; - long size = ReadCommand.serializer.serializedSize(readCommand, messagingVersion); - FakeOutputStream out = new FakeOutputStream(); - ReadCommand.serializer.serialize(readCommand, new WrappedDataOutputStreamPlus(out), messagingVersion); + Tracing.instance.newSession(Tracing.TraceType.QUERY); + MessageOut<ReadCommand> messageOut = new MessageOut(MessagingService.Verb.READ, readCommand, ReadCommand.serializer); + long size = messageOut.serializedSize(messagingVersion); + messageOut.serialize(new WrappedDataOutputStreamPlus(out), messagingVersion); Assert.assertEquals(size, out.count); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/RowCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java index fee3f2c..5ca1eef 100644 --- a/test/unit/org/apache/cassandra/db/RowCacheTest.java +++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.db; -import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -32,6 +31,7 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.cache.RowCacheKey; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.rows.*; @@ -300,8 +300,8 @@ public class RowCacheTest byte[] tk1, tk2; tk1 = "key1000".getBytes(); tk2 = "key1050".getBytes(); - tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1")); - tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2")); + tmd.updateNormalToken(new BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1")); + tmd.updateNormalToken(new BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2")); store.cleanupCache(); assertEquals(50, CacheService.instance.rowCache.size()); CacheService.instance.setRowCacheCapacityInMB(0); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java new file mode 100644 index 0000000..1c051f5 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.UUID; + + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.utils.UUIDGen; + +import static org.junit.Assert.assertEquals; + +public class SystemKeyspaceMigrator40Test extends CQLTester +{ + @Test + public void testMigratePeers() throws Throwable + { + String insert = String.format("INSERT INTO %s (" + + "peer, " + + "data_center, " + + "host_id, " + + "preferred_ip, " + + "rack, " + + "release_version, " + + "rpc_address, " + + "schema_version, " + + "tokens) " + + " values ( ?, ?, ? , ? , ?, ?, ?, ?, ?)", + SystemKeyspaceMigrator40.legacyPeersName); + UUID hostId = UUIDGen.getTimeUUID(); + UUID schemaVersion = UUIDGen.getTimeUUID(); + execute(insert, + InetAddress.getByName("127.0.0.1"), + "dcFoo", + hostId, + InetAddress.getByName("127.0.0.2"), + "rackFoo", "4.0", + InetAddress.getByName("127.0.0.3"), + schemaVersion, + ImmutableSet.of("foobar")); + SystemKeyspaceMigrator40.migrate(); + + int rowCount = 0; + for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.peersName))) + { + rowCount++; + assertEquals(InetAddress.getByName("127.0.0.1"), row.getInetAddress("peer")); + assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("peer_port")); + assertEquals("dcFoo", row.getString("data_center")); + assertEquals(hostId, row.getUUID("host_id")); + assertEquals(InetAddress.getByName("127.0.0.2"), row.getInetAddress("preferred_ip")); + assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("preferred_port")); + assertEquals("rackFoo", row.getString("rack")); + assertEquals("4.0", row.getString("release_version")); + assertEquals(InetAddress.getByName("127.0.0.3"), row.getInetAddress("native_address")); + assertEquals(DatabaseDescriptor.getNativeTransportPort(), row.getInt("native_port")); + assertEquals(schemaVersion, row.getUUID("schema_version")); + assertEquals(ImmutableSet.of("foobar"), row.getSet("tokens", UTF8Type.instance)); + } + assertEquals(1, rowCount); + + //Test nulls/missing don't prevent the row from propagating + execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.legacyPeersName)); + execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.peersName)); + + execute(String.format("INSERT INTO %s (peer) VALUES (?)", SystemKeyspaceMigrator40.legacyPeersName), + InetAddress.getByName("127.0.0.1")); + SystemKeyspaceMigrator40.migrate(); + + rowCount = 0; + for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.peersName))) + { + rowCount++; + } + assertEquals(1, rowCount); + } + + @Test + public void testMigratePeerEvents() throws Throwable + { + String insert = String.format("INSERT INTO %s (" + + "peer, " + + "hints_dropped) " + + " values ( ?, ? )", + SystemKeyspaceMigrator40.legacyPeerEventsName); + UUID uuid = UUIDGen.getTimeUUID(); + execute(insert, + InetAddress.getByName("127.0.0.1"), + ImmutableMap.of(uuid, 42)); + SystemKeyspaceMigrator40.migrate(); + + int rowCount = 0; + for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.peerEventsName))) + { + rowCount++; + assertEquals(InetAddress.getByName("127.0.0.1"), row.getInetAddress("peer")); + assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("peer_port")); + assertEquals(ImmutableMap.of(uuid, 42), row.getMap("hints_dropped", UUIDType.instance, Int32Type.instance)); + } + assertEquals(1, rowCount); + + //Test nulls/missing don't prevent the row from propagating + execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.legacyPeerEventsName)); + execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.peerEventsName)); + + execute(String.format("INSERT INTO %s (peer) VALUES (?)", SystemKeyspaceMigrator40.legacyPeerEventsName), + InetAddress.getByName("127.0.0.1")); + SystemKeyspaceMigrator40.migrate(); + + rowCount = 0; + for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.peerEventsName))) + { + rowCount++; + } + assertEquals(1, rowCount); + } + + @Test + public void testMigrateTransferredRanges() throws Throwable + { + String insert = String.format("INSERT INTO %s (" + + "operation, " + + "peer, " + + "keyspace_name, " + + "ranges) " + + " values ( ?, ?, ?, ? )", + SystemKeyspaceMigrator40.legacyTransferredRangesName); + execute(insert, + "foo", + InetAddress.getByName("127.0.0.1"), + "bar", + ImmutableSet.of(ByteBuffer.wrap(new byte[] { 42 }))); + SystemKeyspaceMigrator40.migrate(); + + int rowCount = 0; + for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.transferredRangesName))) + { + rowCount++; + assertEquals("foo", row.getString("operation")); + assertEquals(InetAddress.getByName("127.0.0.1"), row.getInetAddress("peer")); + assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("peer_port")); + assertEquals("bar", row.getString("keyspace_name")); + assertEquals(ImmutableSet.of(ByteBuffer.wrap(new byte[] { 42 })), row.getSet("ranges", BytesType.instance)); + } + assertEquals(1, rowCount); + + //Test nulls/missing don't prevent the row from propagating + execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.legacyTransferredRangesName)); + execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.transferredRangesName)); + + execute(String.format("INSERT INTO %s (operation, peer, keyspace_name) VALUES (?, ?, ?)", SystemKeyspaceMigrator40.legacyTransferredRangesName), + "foo", + InetAddress.getByName("127.0.0.1"), + "bar"); + SystemKeyspaceMigrator40.migrate(); + + rowCount = 0; + for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.transferredRangesName))) + { + rowCount++; + } + assertEquals(1, rowCount); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java index d1b8ff5..3bc04c1 100644 --- a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.db; import java.io.IOException; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; @@ -26,6 +25,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; @@ -53,7 +53,7 @@ public class SystemKeyspaceTest public void testLocalTokens() { // Remove all existing tokens - Collection<Token> current = SystemKeyspace.loadTokens().asMap().get(FBUtilities.getLocalAddress()); + Collection<Token> current = SystemKeyspace.loadTokens().asMap().get(FBUtilities.getLocalAddressAndPort()); if (current != null && !current.isEmpty()) SystemKeyspace.updateTokens(current); @@ -74,7 +74,7 @@ public class SystemKeyspaceTest public void testNonLocalToken() throws UnknownHostException { BytesToken token = new BytesToken(ByteBufferUtil.bytes("token3")); - InetAddress address = InetAddress.getByName("127.0.0.2"); + InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.2"); SystemKeyspace.updateTokens(address, Collections.<Token>singletonList(token)); assert SystemKeyspace.loadTokens().get(address).contains(token); SystemKeyspace.removeEndpoint(address); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java index 0baad3b..3e38dfc 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction; import java.io.IOException; -import java.net.InetAddress; import java.util.HashSet; import java.util.Set; import java.util.UUID; @@ -31,6 +30,7 @@ import org.junit.Ignore; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.cql3.QueryProcessor; @@ -67,7 +67,7 @@ public class AbstractPendingRepairTest extends AbstractRepairTest // cutoff messaging service MessagingService.instance().addMessageSink(new IMessageSink() { - public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) + public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to) { return false; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java index 5f05fab..6e7e184 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction; import java.io.File; import java.io.IOException; -import java.net.InetAddress; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -35,6 +34,7 @@ import org.junit.After; import org.junit.Ignore; import org.junit.Test; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; @@ -96,7 +96,7 @@ public class AntiCompactionTest private void registerParentRepairSession(UUID sessionID, Collection<Range<Token>> ranges, long repairedAt, UUID pendingRepair) throws IOException { ActiveRepairService.instance.registerParentRepairSession(sessionID, - InetAddress.getByName("10.0.0.1"), + InetAddressAndPort.getByName("10.0.0.1"), Lists.newArrayList(cfs), ranges, pendingRepair != null || repairedAt != UNREPAIRED_SSTABLE, repairedAt, true, PreviewKind.NONE); http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java index b9e3c17..8290adf 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.db.compaction; -import java.net.InetAddress; import java.util.Collections; import java.util.Iterator; import java.util.Set; @@ -33,6 +32,7 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.schema.Schema; @@ -58,7 +58,7 @@ public class CompactionManagerGetSSTablesForValidationTest private String ks; private static final String tbl = "tbl"; private ColumnFamilyStore cfs; - private static InetAddress coordinator; + private static InetAddressAndPort coordinator; private static Token MT; @@ -73,7 +73,7 @@ public class CompactionManagerGetSSTablesForValidationTest public static void setupClass() throws Exception { SchemaLoader.prepareServer(); - coordinator = InetAddress.getByName("10.0.0.1"); + coordinator = InetAddressAndPort.getByName("10.0.0.1"); MT = DatabaseDescriptor.getPartitioner().getMinimumToken(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 624f119..567984d 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -195,7 +195,7 @@ public class LeveledCompactionStrategyTest int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds()); UUID parentRepSession = UUID.randomUUID(); ActiveRepairService.instance.registerParentRepairSession(parentRepSession, - FBUtilities.getBroadcastAddress(), + FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(cfs), Arrays.asList(range), false, @@ -203,7 +203,7 @@ public class LeveledCompactionStrategyTest true, PreviewKind.NONE); RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range)); - Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore, PreviewKind.NONE); + Validator validator = new Validator(desc, FBUtilities.getBroadcastAddressAndPort(), gcBefore, PreviewKind.NONE); CompactionManager.instance.submitValidation(cfs, validator).get(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java index 658b87a..423ad28 100644 --- a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java +++ b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.db.view; -import java.net.InetAddress; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -28,6 +27,7 @@ import org.junit.Test; import junit.framework.Assert; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.OrderPreservingPartitioner.StringToken; @@ -59,12 +59,12 @@ public class ViewUtilsTest metadata.clearUnsafe(); // DC1 - metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1")); - metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2")); + metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1")); + metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2")); // DC2 - metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4")); - metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5")); + metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4")); + metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5")); Map<String, String> replicationMap = new HashMap<>(); replicationMap.put(ReplicationParams.CLASS, NetworkTopologyStrategy.class.getName()); @@ -76,12 +76,12 @@ public class ViewUtilsTest KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap)); Schema.instance.load(meta); - Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", - new StringToken("CA"), - new StringToken("BB")); + Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", + new StringToken("CA"), + new StringToken("BB")); Assert.assertTrue(naturalEndpoint.isPresent()); - Assert.assertEquals(InetAddress.getByName("127.0.0.2"), naturalEndpoint.get()); + Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.2"), naturalEndpoint.get()); } @@ -92,12 +92,12 @@ public class ViewUtilsTest metadata.clearUnsafe(); // DC1 - metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1")); - metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2")); + metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1")); + metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2")); // DC2 - metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4")); - metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5")); + metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4")); + metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5")); Map<String, String> replicationMap = new HashMap<>(); replicationMap.put(ReplicationParams.CLASS, NetworkTopologyStrategy.class.getName()); @@ -109,12 +109,12 @@ public class ViewUtilsTest KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap)); Schema.instance.load(meta); - Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", - new StringToken("CA"), - new StringToken("BB")); + Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", + new StringToken("CA"), + new StringToken("BB")); Assert.assertTrue(naturalEndpoint.isPresent()); - Assert.assertEquals(InetAddress.getByName("127.0.0.1"), naturalEndpoint.get()); + Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.1"), naturalEndpoint.get()); } @Test @@ -124,12 +124,12 @@ public class ViewUtilsTest metadata.clearUnsafe(); // DC1 - metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1")); - metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2")); + metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1")); + metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2")); // DC2 - metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4")); - metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5")); + metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4")); + metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5")); Map<String, String> replicationMap = new HashMap<>(); replicationMap.put(ReplicationParams.CLASS, NetworkTopologyStrategy.class.getName()); @@ -141,9 +141,9 @@ public class ViewUtilsTest KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap)); Schema.instance.load(meta); - Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", - new StringToken("AB"), - new StringToken("BB")); + Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", + new StringToken("AB"), + new StringToken("BB")); Assert.assertFalse(naturalEndpoint.isPresent()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/dht/BootStrapperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java index a1054bb..f11cb62 100644 --- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java +++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java @@ -20,7 +20,6 @@ package org.apache.cassandra.dht; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collection; import java.util.HashSet; @@ -41,6 +40,7 @@ import org.junit.runner.RunWith; import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.tokenallocator.TokenAllocation; @@ -96,32 +96,32 @@ public class BootStrapperTest generateFakeEndpoints(numOldNodes); Token myToken = tmd.partitioner.getRandomToken(); - InetAddress myEndpoint = InetAddress.getByName("127.0.0.1"); + InetAddressAndPort myEndpoint = InetAddressAndPort.getByName("127.0.0.1"); assertEquals(numOldNodes, tmd.sortedTokens().size()); RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), false, 1); IFailureDetector mockFailureDetector = new IFailureDetector() { - public boolean isAlive(InetAddress ep) + public boolean isAlive(InetAddressAndPort ep) { return true; } - public void interpret(InetAddress ep) { throw new UnsupportedOperationException(); } - public void report(InetAddress ep) { throw new UnsupportedOperationException(); } + public void interpret(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } + public void report(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); } public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); } - public void remove(InetAddress ep) { throw new UnsupportedOperationException(); } - public void forceConviction(InetAddress ep) { throw new UnsupportedOperationException(); } + public void remove(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } + public void forceConviction(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } }; s.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(mockFailureDetector)); s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint)); - Collection<Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = s.toFetch().get(keyspaceName); + Collection<Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> toFetch = s.toFetch().get(keyspaceName); // Check we get get RF new ranges in total Set<Range<Token>> ranges = new HashSet<>(); - for (Map.Entry<InetAddress, Collection<Range<Token>>> e : toFetch) + for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> e : toFetch) ranges.addAll(e.getValue()); assertEquals(replicationFactor, ranges.size()); @@ -151,7 +151,7 @@ public class BootStrapperTest for (int i = 1; i <= numOldNodes; i++) { // leave .1 for myEndpoint - InetAddress addr = InetAddress.getByName("127." + dc + "." + rack + "." + (i + 1)); + InetAddressAndPort addr = InetAddressAndPort.getByName("127." + dc + "." + rack + "." + (i + 1)); List<Token> tokens = Lists.newArrayListWithCapacity(numVNodes); for (int j = 0; j < numVNodes; ++j) tokens.add(p.getRandomToken()); @@ -167,7 +167,7 @@ public class BootStrapperTest String ks = "BootStrapperTestKeyspace3"; TokenMetadata tm = new TokenMetadata(); generateFakeEndpoints(tm, 10, vn); - InetAddress addr = FBUtilities.getBroadcastAddress(); + InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort(); allocateTokensForNode(vn, ks, tm, addr); } @@ -184,15 +184,15 @@ public class BootStrapperTest // Register peers with expected DC for NetworkTopologyStrategy. TokenMetadata metadata = StorageService.instance.getTokenMetadata(); metadata.clearUnsafe(); - metadata.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.1.0.99")); - metadata.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.15.0.99")); + metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.1.0.99")); + metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.15.0.99")); SchemaLoader.createKeyspace(ks, KeyspaceParams.nts(dc, replicas, "15", 15), SchemaLoader.standardCFMD(ks, "Standard1")); TokenMetadata tm = StorageService.instance.getTokenMetadata(); tm.clearUnsafe(); for (int i = 0; i < rackCount; ++i) generateFakeEndpoints(tm, 10, vn, dc, Integer.toString(i)); - InetAddress addr = InetAddress.getByName("127." + dc + ".0.99"); + InetAddressAndPort addr = InetAddressAndPort.getByName("127." + dc + ".0.99"); allocateTokensForNode(vn, ks, tm, addr); // Note: Not matching replication factor in second datacentre, but this should not affect us. } finally { @@ -230,7 +230,7 @@ public class BootStrapperTest testAllocateTokensNetworkStrategy(1, 1); } - private void allocateTokensForNode(int vn, String ks, TokenMetadata tm, InetAddress addr) + private void allocateTokensForNode(int vn, String ks, TokenMetadata tm, InetAddressAndPort addr) { SummaryStatistics os = TokenAllocation.replicatedOwnershipStats(tm.cloneOnlyTokenMap(), Keyspace.open(ks).getReplicationStrategy(), addr); Collection<Token> tokens = BootStrapper.allocateTokens(tm, addr, ks, vn, 0); @@ -260,14 +260,14 @@ public class BootStrapperTest TokenMetadata tm = new TokenMetadata(); generateFakeEndpoints(tm, 10, vn); - InetAddress dcaddr = FBUtilities.getBroadcastAddress(); + InetAddressAndPort dcaddr = FBUtilities.getBroadcastAddressAndPort(); SummaryStatistics os3 = TokenAllocation.replicatedOwnershipStats(tm, Keyspace.open(ks3).getReplicationStrategy(), dcaddr); SummaryStatistics os2 = TokenAllocation.replicatedOwnershipStats(tm, Keyspace.open(ks2).getReplicationStrategy(), dcaddr); String cks = ks3; String nks = ks2; for (int i=11; i<=20; ++i) { - allocateTokensForNode(vn, cks, tm, InetAddress.getByName("127.0.0." + (i + 1))); + allocateTokensForNode(vn, cks, tm, InetAddressAndPort.getByName("127.0.0." + (i + 1))); String t = cks; cks = nks; nks = t; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
