http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index 294731a..4f7cde0 100644 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@ -20,8 +20,6 @@ package org.apache.cassandra.service; import java.util.*; -import javax.xml.crypto.Data; - import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import org.junit.Assert; @@ -36,13 +34,13 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.streaming.PreviewKind; @@ -107,13 +105,13 @@ public class ActiveRepairServiceTest public void testGetNeighborsPlusOne() throws Throwable { // generate rf+1 nodes, and ensure that all nodes are returned - Set<InetAddressAndPort> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + Set<InetAddressAndPort> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas); expected.remove(FBUtilities.getBroadcastAddressAndPort()); - Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Iterable<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE5).ranges(); Set<InetAddressAndPort> neighbors = new HashSet<>(); for (Range<Token> range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null)); + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null).endpoints()); } assertEquals(expected, neighbors); } @@ -124,19 +122,19 @@ public class ActiveRepairServiceTest TokenMetadata tmd = StorageService.instance.getTokenMetadata(); // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned - addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas); AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy(); Set<InetAddressAndPort> expected = new HashSet<>(); - for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddressAndPort())) + for (Replica replica : ars.getAddressReplicas().get(FBUtilities.getBroadcastAddressAndPort())) { - expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange)); + expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replica.range()).endpoints()); } expected.remove(FBUtilities.getBroadcastAddressAndPort()); - Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Iterable<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE5).ranges(); Set<InetAddressAndPort> neighbors = new HashSet<>(); for (Range<Token> range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null)); + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, null, null).endpoints()); } assertEquals(expected, neighbors); } @@ -147,18 +145,18 @@ public class ActiveRepairServiceTest TokenMetadata tmd = StorageService.instance.getTokenMetadata(); // generate rf+1 nodes, and ensure that all nodes are returned - Set<InetAddressAndPort> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + Set<InetAddressAndPort> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas); expected.remove(FBUtilities.getBroadcastAddressAndPort()); // remove remote endpoints TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology(); HashSet<InetAddressAndPort> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter())); expected = Sets.intersection(expected, localEndpoints); - Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Iterable<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE5).ranges(); Set<InetAddressAndPort> neighbors = new HashSet<>(); for (Range<Token> range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null).endpoints()); } assertEquals(expected, neighbors); } @@ -169,12 +167,12 @@ public class ActiveRepairServiceTest TokenMetadata tmd = StorageService.instance.getTokenMetadata(); // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned - addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas); AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy(); Set<InetAddressAndPort> expected = new HashSet<>(); - for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddressAndPort())) + for (Replica replica : ars.getAddressReplicas().get(FBUtilities.getBroadcastAddressAndPort())) { - expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange)); + expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replica.range()).endpoints()); } expected.remove(FBUtilities.getBroadcastAddressAndPort()); // remove remote endpoints @@ -182,11 +180,11 @@ public class ActiveRepairServiceTest HashSet<InetAddressAndPort> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter())); expected = Sets.intersection(expected, localEndpoints); - Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Iterable<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE5).ranges(); Set<InetAddressAndPort> neighbors = new HashSet<>(); for (Range<Token> range : ranges) { - neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null).endpoints()); } assertEquals(expected, neighbors); } @@ -197,30 +195,30 @@ public class ActiveRepairServiceTest TokenMetadata tmd = StorageService.instance.getTokenMetadata(); // generate rf*2 nodes, and ensure that only neighbors specified by the hosts are returned - addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas); AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy(); List<InetAddressAndPort> expected = new ArrayList<>(); - for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddressAndPort())) + for (Replica replicas : ars.getAddressReplicas().get(FBUtilities.getBroadcastAddressAndPort())) { - expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange)); + expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicas.range()).endpoints()); } expected.remove(FBUtilities.getBroadcastAddressAndPort()); Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddressAndPort().toString(),expected.get(0).toString()); - Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Iterable<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE5).ranges(); assertEquals(expected.get(0), ActiveRepairService.getNeighbors(KEYSPACE5, ranges, ranges.iterator().next(), - null, hosts).iterator().next()); + null, hosts).endpoints().iterator().next()); } @Test(expected = IllegalArgumentException.class) public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws Throwable { - addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor().allReplicas); //Dont give local endpoint Collection<String> hosts = Arrays.asList("127.0.0.3"); - Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Iterable<Range<Token>> ranges = StorageService.instance.getLocalReplicas(KEYSPACE5).ranges(); ActiveRepairService.getNeighbors(KEYSPACE5, ranges, ranges.iterator().next(), null, hosts); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java b/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java new file mode 100644 index 0000000..63973ea --- /dev/null +++ b/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; + +import org.apache.cassandra.locator.EndpointsByReplica; +import org.apache.cassandra.locator.EndpointsForRange; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.OrderPreservingPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.RangeStreamer; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.AbstractEndpointSnitch; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaCollection; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.utils.Pair; + +import static org.apache.cassandra.locator.Replica.fullReplica; +import static org.apache.cassandra.locator.Replica.transientReplica; +import static org.apache.cassandra.service.StorageServiceTest.assertMultimapEqualsIgnoreOrder; + +/** + * This is also fairly effectively testing source retrieval for bootstrap as well since RangeStreamer + * is used to calculate the endpoints to fetch from and check they are alive for both RangeRelocator (move) and + * bootstrap (RangeRelocator). + */ +public class BootstrapTransientTest +{ + static InetAddressAndPort aAddress; + static InetAddressAndPort bAddress; + static InetAddressAndPort cAddress; + static InetAddressAndPort dAddress; + + @BeforeClass + public static void setUpClass() throws Exception + { + aAddress = InetAddressAndPort.getByName("127.0.0.1"); + bAddress = InetAddressAndPort.getByName("127.0.0.2"); + cAddress = InetAddressAndPort.getByName("127.0.0.3"); + dAddress = InetAddressAndPort.getByName("127.0.0.4"); + } + + private final List<InetAddressAndPort> downNodes = new ArrayList<>(); + Predicate<Replica> alivePredicate = replica -> !downNodes.contains(replica.endpoint()); + + private final List<InetAddressAndPort> sourceFilterDownNodes = new ArrayList<>(); + private final Collection<Predicate<Replica>> sourceFilters = Collections.singleton(replica -> !sourceFilterDownNodes.contains(replica.endpoint())); + + @After + public void clearDownNode() + { + downNodes.clear(); + sourceFilterDownNodes.clear(); + } + + @BeforeClass + public static void setupDD() + { + DatabaseDescriptor.daemonInitialization(); + } + + Token tenToken = new OrderPreservingPartitioner.StringToken("00010"); + Token twentyToken = new OrderPreservingPartitioner.StringToken("00020"); + Token thirtyToken = new OrderPreservingPartitioner.StringToken("00030"); + Token fourtyToken = new OrderPreservingPartitioner.StringToken("00040"); + + Range<Token> aRange = new Range<>(thirtyToken, tenToken); + Range<Token> bRange = new Range<>(tenToken, twentyToken); + Range<Token> cRange = new Range<>(twentyToken, thirtyToken); + Range<Token> dRange = new Range<>(thirtyToken, fourtyToken); + + RangesAtEndpoint toFetch = RangesAtEndpoint.of(new Replica(dAddress, dRange, true), + new Replica(dAddress, cRange, true), + new Replica(dAddress, bRange, false)); + + @Test + public void testRangeStreamerRangesToFetch() throws Exception + { + EndpointsByReplica expectedResult = new EndpointsByReplica(ImmutableMap.of( + fullReplica(dAddress, dRange), EndpointsForRange.builder(aRange).add(fullReplica(bAddress, aRange)).add(transientReplica(cAddress, aRange)).build(), + fullReplica(dAddress, cRange), EndpointsForRange.builder(cRange).add(fullReplica(cAddress, cRange)).add(transientReplica(bAddress, cRange)).build(), + transientReplica(dAddress, bRange), EndpointsForRange.builder(bRange).add(transientReplica(aAddress, bRange)).build())); + + invokeCalculateRangesToFetchWithPreferredEndpoints(toFetch, constructTMDs(), expectedResult); + } + + private Pair<TokenMetadata, TokenMetadata> constructTMDs() + { + TokenMetadata tmd = new TokenMetadata(); + tmd.updateNormalToken(aRange.right, aAddress); + tmd.updateNormalToken(bRange.right, bAddress); + tmd.updateNormalToken(cRange.right, cAddress); + TokenMetadata updated = tmd.cloneOnlyTokenMap(); + updated.updateNormalToken(dRange.right, dAddress); + + return Pair.create(tmd, updated); + } + + private void invokeCalculateRangesToFetchWithPreferredEndpoints(ReplicaCollection<?> toFetch, + Pair<TokenMetadata, TokenMetadata> tmds, + EndpointsByReplica expectedResult) + { + DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); + + EndpointsByReplica result = RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> replicas, + simpleStrategy(tmds.left), + toFetch, + true, + tmds.left, + tmds.right, + alivePredicate, + "OldNetworkTopologyStrategyTest", + sourceFilters); + result.asMap().forEach((replica, list) -> System.out.printf("Replica %s, sources %s%n", replica, list)); + assertMultimapEqualsIgnoreOrder(expectedResult, result); + + } + + private AbstractReplicationStrategy simpleStrategy(TokenMetadata tmd) + { + IEndpointSnitch snitch = new AbstractEndpointSnitch() + { + public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2) + { + return 0; + } + + public String getRack(InetAddressAndPort endpoint) + { + return "R1"; + } + + public String getDatacenter(InetAddressAndPort endpoint) + { + return "DC1"; + } + }; + + return new SimpleStrategy("MoveTransientTest", + tmd, + snitch, + com.google.common.collect.ImmutableMap.of("replication_factor", "3/1")); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java index 8ddc4f0..3c4748e 100644 --- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java +++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java @@ -130,10 +130,10 @@ public class LeaveAndBootstrapTest strategy = getStrategy(keyspaceName, tmd); for (Token token : keyTokens) { - int replicationFactor = strategy.getReplicationFactor(); + int replicationFactor = strategy.getReplicationFactor().allReplicas; - HashSet<InetAddressAndPort> actual = new HashSet<>(tmd.getWriteEndpoints(token, keyspaceName, strategy.calculateNaturalEndpoints(token, tmd.cloneOnlyTokenMap()))); - HashSet<InetAddressAndPort> expected = new HashSet<>(); + Set<InetAddressAndPort> actual = tmd.getWriteEndpoints(token, keyspaceName, strategy.calculateNaturalReplicas(token, tmd.cloneOnlyTokenMap()).forToken(token)).endpoints(); + Set<InetAddressAndPort> expected = new HashSet<>(); for (int i = 0; i < replicationFactor; i++) { @@ -198,8 +198,6 @@ public class LeaveAndBootstrapTest ApplicationState.STATUS, valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7)))); - Collection<InetAddressAndPort> endpoints = null; - /* don't require test update every time a new keyspace is added to test/conf/cassandra.yaml */ Map<String, AbstractReplicationStrategy> keyspaceStrategyMap = new HashMap<String, AbstractReplicationStrategy>(); for (int i=1; i<=4; i++) @@ -263,18 +261,18 @@ public class LeaveAndBootstrapTest for (int i = 0; i < keyTokens.size(); i++) { - endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i))); + Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(i))).endpoints(); assertEquals(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).size(), endpoints.size()); assertTrue(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).containsAll(endpoints)); } // just to be sure that things still work according to the old tests, run them: - if (strategy.getReplicationFactor() != 3) + if (strategy.getReplicationFactor().allReplicas != 3) continue; // tokens 5, 15 and 25 should go three nodes for (int i=0; i<3; ++i) { - endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i))); + Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(i))).endpoints(); assertEquals(3, endpoints.size()); assertTrue(endpoints.contains(hosts.get(i+1))); assertTrue(endpoints.contains(hosts.get(i+2))); @@ -282,7 +280,7 @@ public class LeaveAndBootstrapTest } // token 35 should go to nodes 4, 5, 6, 7 and boot1 - endpoints = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(3))); + Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(3))).endpoints(); assertEquals(5, endpoints.size()); assertTrue(endpoints.contains(hosts.get(4))); assertTrue(endpoints.contains(hosts.get(5))); @@ -291,7 +289,7 @@ public class LeaveAndBootstrapTest assertTrue(endpoints.contains(boot1)); // token 45 should go to nodes 5, 6, 7, 0, boot1 and boot2 - endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(4))); + endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(4))).endpoints(); assertEquals(6, endpoints.size()); assertTrue(endpoints.contains(hosts.get(5))); assertTrue(endpoints.contains(hosts.get(6))); @@ -301,7 +299,7 @@ public class LeaveAndBootstrapTest assertTrue(endpoints.contains(boot2)); // token 55 should go to nodes 6, 7, 8, 0, 1, boot1 and boot2 - endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(5))); + endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(5))).endpoints(); assertEquals(7, endpoints.size()); assertTrue(endpoints.contains(hosts.get(6))); assertTrue(endpoints.contains(hosts.get(7))); @@ -312,7 +310,7 @@ public class LeaveAndBootstrapTest assertTrue(endpoints.contains(boot2)); // token 65 should go to nodes 7, 8, 9, 0, 1 and boot2 - endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(6))); + endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(6))).endpoints(); assertEquals(6, endpoints.size()); assertTrue(endpoints.contains(hosts.get(7))); assertTrue(endpoints.contains(hosts.get(8))); @@ -322,7 +320,7 @@ public class LeaveAndBootstrapTest assertTrue(endpoints.contains(boot2)); // token 75 should to go nodes 8, 9, 0, 1, 2 and boot2 - endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(7))); + endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(7))).endpoints(); assertEquals(6, endpoints.size()); assertTrue(endpoints.contains(hosts.get(8))); assertTrue(endpoints.contains(hosts.get(9))); @@ -332,7 +330,7 @@ public class LeaveAndBootstrapTest assertTrue(endpoints.contains(boot2)); // token 85 should go to nodes 9, 0, 1 and 2 - endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(8))); + endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(8))).endpoints(); assertEquals(4, endpoints.size()); assertTrue(endpoints.contains(hosts.get(9))); assertTrue(endpoints.contains(hosts.get(0))); @@ -340,7 +338,7 @@ public class LeaveAndBootstrapTest assertTrue(endpoints.contains(hosts.get(2))); // token 95 should go to nodes 0, 1 and 2 - endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(9))); + endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(9))).endpoints(); assertEquals(3, endpoints.size()); assertTrue(endpoints.contains(hosts.get(0))); assertTrue(endpoints.contains(hosts.get(1))); @@ -385,18 +383,18 @@ public class LeaveAndBootstrapTest for (int i = 0; i < keyTokens.size(); i++) { - endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i))); + Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(i))).endpoints(); assertEquals(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).size(), endpoints.size()); assertTrue(expectedEndpoints.get(keyspaceName).get(keyTokens.get(i)).containsAll(endpoints)); } - if (strategy.getReplicationFactor() != 3) + if (strategy.getReplicationFactor().allReplicas != 3) continue; // leave this stuff in to guarantee the old tests work the way they were supposed to. // tokens 5, 15 and 25 should go three nodes for (int i=0; i<3; ++i) { - endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i))); + Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(i))).endpoints(); assertEquals(3, endpoints.size()); assertTrue(endpoints.contains(hosts.get(i+1))); assertTrue(endpoints.contains(hosts.get(i+2))); @@ -404,21 +402,21 @@ public class LeaveAndBootstrapTest } // token 35 goes to nodes 4, 5 and boot1 - endpoints = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(3))); + Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(3))).endpoints(); assertEquals(3, endpoints.size()); assertTrue(endpoints.contains(hosts.get(4))); assertTrue(endpoints.contains(hosts.get(5))); assertTrue(endpoints.contains(boot1)); // token 45 goes to nodes 5, boot1 and node7 - endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(4))); + endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(4))).endpoints(); assertEquals(3, endpoints.size()); assertTrue(endpoints.contains(hosts.get(5))); assertTrue(endpoints.contains(boot1)); assertTrue(endpoints.contains(hosts.get(7))); // token 55 goes to boot1, 7, boot2, 8 and 0 - endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(5))); + endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(5))).endpoints(); assertEquals(5, endpoints.size()); assertTrue(endpoints.contains(boot1)); assertTrue(endpoints.contains(hosts.get(7))); @@ -427,7 +425,7 @@ public class LeaveAndBootstrapTest assertTrue(endpoints.contains(hosts.get(0))); // token 65 goes to nodes 7, boot2, 8, 0 and 1 - endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(6))); + endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(6))).endpoints(); assertEquals(5, endpoints.size()); assertTrue(endpoints.contains(hosts.get(7))); assertTrue(endpoints.contains(boot2)); @@ -436,7 +434,7 @@ public class LeaveAndBootstrapTest assertTrue(endpoints.contains(hosts.get(1))); // token 75 goes to nodes boot2, 8, 0, 1 and 2 - endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(7))); + endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(7))).endpoints(); assertEquals(5, endpoints.size()); assertTrue(endpoints.contains(boot2)); assertTrue(endpoints.contains(hosts.get(8))); @@ -445,14 +443,14 @@ public class LeaveAndBootstrapTest assertTrue(endpoints.contains(hosts.get(2))); // token 85 goes to nodes 0, 1 and 2 - endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(8))); + endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(8))).endpoints(); assertEquals(3, endpoints.size()); assertTrue(endpoints.contains(hosts.get(0))); assertTrue(endpoints.contains(hosts.get(1))); assertTrue(endpoints.contains(hosts.get(2))); // token 95 goes to nodes 0, 1 and 2 - endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(9))); + endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(9))).endpoints(); assertEquals(3, endpoints.size()); assertTrue(endpoints.contains(hosts.get(0))); assertTrue(endpoints.contains(hosts.get(1))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/MoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java index 7321fba..731a25d 100644 --- a/test/unit/org/apache/cassandra/service/MoveTest.java +++ b/test/unit/org/apache/cassandra/service/MoveTest.java @@ -23,13 +23,21 @@ import java.net.UnknownHostException; import java.util.*; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.RangesByEndpoint; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaCollection; +import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.schema.MigrationManager; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; @@ -492,24 +500,25 @@ public class MoveTest tmd.updateNormalToken(new BigIntegerToken(String.valueOf(token)), host); } - private Map.Entry<Range<Token>, Collection<InetAddressAndPort>> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException + private Map.Entry<Range<Token>, EndpointsForRange> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException { - Map<Range<Token>, Collection<InetAddressAndPort>> pendingRanges = new HashMap<>(); - pendingRanges.put(generateRange(start, end), makeAddrs(endpoints)); + Map<Range<Token>, EndpointsForRange> pendingRanges = new HashMap<>(); + Range<Token> range = generateRange(start, end); + pendingRanges.put(range, makeReplicas(range, endpoints)); return pendingRanges.entrySet().iterator().next(); } - private Map<Range<Token>, Collection<InetAddressAndPort>> generatePendingRanges(Map.Entry<Range<Token>, Collection<InetAddressAndPort>>... entries) + private Map<Range<Token>, EndpointsForRange> generatePendingRanges(Map.Entry<Range<Token>, EndpointsForRange>... entries) { - Map<Range<Token>, Collection<InetAddressAndPort>> pendingRanges = new HashMap<>(); - for(Map.Entry<Range<Token>, Collection<InetAddressAndPort>> entry : entries) + Map<Range<Token>, EndpointsForRange> pendingRanges = new HashMap<>(); + for(Map.Entry<Range<Token>, EndpointsForRange> entry : entries) { pendingRanges.put(entry.getKey(), entry.getValue()); } return pendingRanges; } - private void assertPendingRanges(TokenMetadata tmd, Map<Range<Token>, Collection<InetAddressAndPort>> pendingRanges, String keyspaceName) throws ConfigurationException + private void assertPendingRanges(TokenMetadata tmd, Map<Range<Token>, EndpointsForRange> pendingRanges, String keyspaceName) throws ConfigurationException { boolean keyspaceFound = false; for (String nonSystemKeyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) @@ -523,15 +532,15 @@ public class MoveTest assert keyspaceFound; } - private void assertMaps(Map<Range<Token>, Collection<InetAddressAndPort>> expected, PendingRangeMaps actual) + private void assertMaps(Map<Range<Token>, EndpointsForRange> expected, PendingRangeMaps actual) { int sizeOfActual = 0; - Iterator<Map.Entry<Range<Token>, List<InetAddressAndPort>>> iterator = actual.iterator(); + Iterator<Map.Entry<Range<Token>, EndpointsForRange.Mutable>> iterator = actual.iterator(); while(iterator.hasNext()) { - Map.Entry<Range<Token>, List<InetAddressAndPort>> actualEntry = iterator.next(); + Map.Entry<Range<Token>, EndpointsForRange.Mutable> actualEntry = iterator.next(); assertNotNull(expected.get(actualEntry.getKey())); - assertEquals(new HashSet<>(expected.get(actualEntry.getKey())), new HashSet<>(actualEntry.getValue())); + assertEquals(ImmutableSet.copyOf(expected.get(actualEntry.getKey())), ImmutableSet.copyOf(actualEntry.getValue())); sizeOfActual++; } @@ -589,9 +598,9 @@ public class MoveTest int numMoved = 0; for (Token token : keyTokens) { - int replicationFactor = strategy.getReplicationFactor(); + int replicationFactor = strategy.getReplicationFactor().allReplicas; - HashSet<InetAddressAndPort> actual = new HashSet<>(tmd.getWriteEndpoints(token, keyspaceName, strategy.calculateNaturalEndpoints(token, tmd.cloneOnlyTokenMap()))); + EndpointsForToken actual = tmd.getWriteEndpoints(token, keyspaceName, strategy.calculateNaturalReplicas(token, tmd.cloneOnlyTokenMap()).forToken(token)); HashSet<InetAddressAndPort> expected = new HashSet<>(); for (int i = 0; i < replicationFactor; i++) @@ -600,10 +609,10 @@ public class MoveTest } if (expected.size() == actual.size()) { - assertEquals("mismatched endpoint sets", expected, actual); + assertEquals("mismatched endpoint sets", expected, actual.endpoints()); } else { expected.add(hosts.get(MOVING_NODE)); - assertEquals("mismatched endpoint sets", expected, actual); + assertEquals("mismatched endpoint sets", expected, actual.endpoints()); numMoved++; } } @@ -648,8 +657,6 @@ public class MoveTest newTokens.put(movingIndex, newToken); } - Collection<InetAddressAndPort> endpoints; - tmd = tmd.cloneAfterAllSettled(); ss.setTokenMetadataUnsafe(tmd); @@ -693,37 +700,18 @@ public class MoveTest * } */ - Multimap<InetAddressAndPort, Range<Token>> keyspace1ranges = keyspaceStrategyMap.get(Simple_RF1_KeyspaceName).getAddressRanges(); - Collection<Range<Token>> ranges1 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.1")); - assertEquals(1, collectionSize(ranges1)); - assertEquals(generateRange(97, 0), ranges1.iterator().next()); - Collection<Range<Token>> ranges2 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.2")); - assertEquals(1, collectionSize(ranges2)); - assertEquals(generateRange(0, 10), ranges2.iterator().next()); - Collection<Range<Token>> ranges3 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.3")); - assertEquals(1, collectionSize(ranges3)); - assertEquals(generateRange(10, 20), ranges3.iterator().next()); - Collection<Range<Token>> ranges4 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.4")); - assertEquals(1, collectionSize(ranges4)); - assertEquals(generateRange(20, 30), ranges4.iterator().next()); - Collection<Range<Token>> ranges5 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.5")); - assertEquals(1, collectionSize(ranges5)); - assertEquals(generateRange(30, 40), ranges5.iterator().next()); - Collection<Range<Token>> ranges6 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.6")); - assertEquals(1, collectionSize(ranges6)); - assertEquals(generateRange(40, 50), ranges6.iterator().next()); - Collection<Range<Token>> ranges7 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.7")); - assertEquals(1, collectionSize(ranges7)); - assertEquals(generateRange(50, 67), ranges7.iterator().next()); - Collection<Range<Token>> ranges8 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.8")); - assertEquals(1, collectionSize(ranges8)); - assertEquals(generateRange(67, 70), ranges8.iterator().next()); - Collection<Range<Token>> ranges9 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.9")); - assertEquals(1, collectionSize(ranges9)); - assertEquals(generateRange(70, 87), ranges9.iterator().next()); - Collection<Range<Token>> ranges10 = keyspace1ranges.get(InetAddressAndPort.getByName("127.0.0.10")); - assertEquals(1, collectionSize(ranges10)); - assertEquals(generateRange(87, 97), ranges10.iterator().next()); + RangesByEndpoint keyspace1ranges = keyspaceStrategyMap.get(Simple_RF1_KeyspaceName).getAddressReplicas(); + + assertRanges(keyspace1ranges, "127.0.0.1", 97, 0); + assertRanges(keyspace1ranges, "127.0.0.2", 0, 10); + assertRanges(keyspace1ranges, "127.0.0.3", 10, 20); + assertRanges(keyspace1ranges, "127.0.0.4", 20, 30); + assertRanges(keyspace1ranges, "127.0.0.5", 30, 40); + assertRanges(keyspace1ranges, "127.0.0.6", 40, 50); + assertRanges(keyspace1ranges, "127.0.0.7", 50, 67); + assertRanges(keyspace1ranges, "127.0.0.8", 67, 70); + assertRanges(keyspace1ranges, "127.0.0.9", 70, 87); + assertRanges(keyspace1ranges, "127.0.0.10", 87, 97); /** @@ -742,37 +730,17 @@ public class MoveTest * } */ - Multimap<InetAddressAndPort, Range<Token>> keyspace3ranges = keyspaceStrategyMap.get(KEYSPACE3).getAddressRanges(); - ranges1 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.1")); - assertEquals(collectionSize(ranges1), 5); - assertTrue(ranges1.equals(generateRanges(97, 0, 70, 87, 50, 67, 87, 97, 67, 70))); - ranges2 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.2")); - assertEquals(collectionSize(ranges2), 5); - assertTrue(ranges2.equals(generateRanges(97, 0, 70, 87, 87, 97, 0, 10, 67, 70))); - ranges3 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.3")); - assertEquals(collectionSize(ranges3), 5); - assertTrue(ranges3.equals(generateRanges(97, 0, 70, 87, 87, 97, 0, 10, 10, 20))); - ranges4 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.4")); - assertEquals(collectionSize(ranges4), 5); - assertTrue(ranges4.equals(generateRanges(97, 0, 20, 30, 87, 97, 0, 10, 10, 20))); - ranges5 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.5")); - assertEquals(collectionSize(ranges5), 5); - assertTrue(ranges5.equals(generateRanges(97, 0, 30, 40, 20, 30, 0, 10, 10, 20))); - ranges6 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.6")); - assertEquals(collectionSize(ranges6), 5); - assertTrue(ranges6.equals(generateRanges(40, 50, 30, 40, 20, 30, 0, 10, 10, 20))); - ranges7 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.7")); - assertEquals(collectionSize(ranges7), 5); - assertTrue(ranges7.equals(generateRanges(40, 50, 30, 40, 50, 67, 20, 30, 10, 20))); - ranges8 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.8")); - assertEquals(collectionSize(ranges8), 5); - assertTrue(ranges8.equals(generateRanges(40, 50, 30, 40, 50, 67, 20, 30, 67, 70))); - ranges9 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.9")); - assertEquals(collectionSize(ranges9), 5); - assertTrue(ranges9.equals(generateRanges(40, 50, 70, 87, 30, 40, 50, 67, 67, 70))); - ranges10 = keyspace3ranges.get(InetAddressAndPort.getByName("127.0.0.10")); - assertEquals(collectionSize(ranges10), 5); - assertTrue(ranges10.equals(generateRanges(40, 50, 70, 87, 50, 67, 87, 97, 67, 70))); + RangesByEndpoint keyspace3ranges = keyspaceStrategyMap.get(KEYSPACE3).getAddressReplicas(); + assertRanges(keyspace3ranges, "127.0.0.1", 97, 0, 70, 87, 50, 67, 87, 97, 67, 70); + assertRanges(keyspace3ranges, "127.0.0.2", 97, 0, 70, 87, 87, 97, 0, 10, 67, 70); + assertRanges(keyspace3ranges, "127.0.0.3", 97, 0, 70, 87, 87, 97, 0, 10, 10, 20); + assertRanges(keyspace3ranges, "127.0.0.4", 97, 0, 20, 30, 87, 97, 0, 10, 10, 20); + assertRanges(keyspace3ranges, "127.0.0.5", 97, 0, 30, 40, 20, 30, 0, 10, 10, 20); + assertRanges(keyspace3ranges, "127.0.0.6", 40, 50, 30, 40, 20, 30, 0, 10, 10, 20); + assertRanges(keyspace3ranges, "127.0.0.7", 40, 50, 30, 40, 50, 67, 20, 30, 10, 20); + assertRanges(keyspace3ranges, "127.0.0.8", 40, 50, 30, 40, 50, 67, 20, 30, 67, 70); + assertRanges(keyspace3ranges, "127.0.0.9", 40, 50, 70, 87, 30, 40, 50, 67, 67, 70); + assertRanges(keyspace3ranges, "127.0.0.10", 40, 50, 70, 87, 50, 67, 87, 97, 67, 70); /** @@ -790,37 +758,18 @@ public class MoveTest * /127.0.0.10=[(70,87], (87,97], (67,70]] * } */ - Multimap<InetAddressAndPort, Range<Token>> keyspace4ranges = keyspaceStrategyMap.get(Simple_RF3_KeyspaceName).getAddressRanges(); - ranges1 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.1")); - assertEquals(collectionSize(ranges1), 3); - assertTrue(ranges1.equals(generateRanges(97, 0, 70, 87, 87, 97))); - ranges2 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.2")); - assertEquals(collectionSize(ranges2), 3); - assertTrue(ranges2.equals(generateRanges(97, 0, 87, 97, 0, 10))); - ranges3 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.3")); - assertEquals(collectionSize(ranges3), 3); - assertTrue(ranges3.equals(generateRanges(97, 0, 0, 10, 10, 20))); - ranges4 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.4")); - assertEquals(collectionSize(ranges4), 3); - assertTrue(ranges4.equals(generateRanges(20, 30, 0, 10, 10, 20))); - ranges5 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.5")); - assertEquals(collectionSize(ranges5), 3); - assertTrue(ranges5.equals(generateRanges(30, 40, 20, 30, 10, 20))); - ranges6 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.6")); - assertEquals(collectionSize(ranges6), 3); - assertTrue(ranges6.equals(generateRanges(40, 50, 30, 40, 20, 30))); - ranges7 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.7")); - assertEquals(collectionSize(ranges7), 3); - assertTrue(ranges7.equals(generateRanges(40, 50, 30, 40, 50, 67))); - ranges8 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.8")); - assertEquals(collectionSize(ranges8), 3); - assertTrue(ranges8.equals(generateRanges(40, 50, 50, 67, 67, 70))); - ranges9 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.9")); - assertEquals(collectionSize(ranges9), 3); - assertTrue(ranges9.equals(generateRanges(70, 87, 50, 67, 67, 70))); - ranges10 = keyspace4ranges.get(InetAddressAndPort.getByName("127.0.0.10")); - assertEquals(collectionSize(ranges10), 3); - assertTrue(ranges10.equals(generateRanges(70, 87, 87, 97, 67, 70))); + RangesByEndpoint keyspace4ranges = keyspaceStrategyMap.get(Simple_RF3_KeyspaceName).getAddressReplicas(); + + assertRanges(keyspace4ranges, "127.0.0.1", 97, 0, 70, 87, 87, 97); + assertRanges(keyspace4ranges, "127.0.0.2", 97, 0, 87, 97, 0, 10); + assertRanges(keyspace4ranges, "127.0.0.3", 97, 0, 0, 10, 10, 20); + assertRanges(keyspace4ranges, "127.0.0.4", 20, 30, 0, 10, 10, 20); + assertRanges(keyspace4ranges, "127.0.0.5", 30, 40, 20, 30, 10, 20); + assertRanges(keyspace4ranges, "127.0.0.6", 40, 50, 30, 40, 20, 30); + assertRanges(keyspace4ranges, "127.0.0.7", 40, 50, 30, 40, 50, 67); + assertRanges(keyspace4ranges, "127.0.0.8", 40, 50, 50, 67, 67, 70); + assertRanges(keyspace4ranges, "127.0.0.9", 70, 87, 50, 67, 67, 70); + assertRanges(keyspace4ranges, "127.0.0.10", 70, 87, 87, 97, 67, 70); // pre-calculate the results. Map<String, Multimap<Token, InetAddressAndPort>> expectedEndpoints = new HashMap<>(); @@ -876,79 +825,80 @@ public class MoveTest for (Token token : keyTokens) { - endpoints = tmd.getWriteEndpoints(token, keyspaceName, strategy.getNaturalEndpoints(token)); + Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(token, keyspaceName, strategy.getNaturalReplicasForToken(token)).endpoints(); assertEquals(expectedEndpoints.get(keyspaceName).get(token).size(), endpoints.size()); assertTrue(expectedEndpoints.get(keyspaceName).get(token).containsAll(endpoints)); } // just to be sure that things still work according to the old tests, run them: - if (strategy.getReplicationFactor() != 3) + if (strategy.getReplicationFactor().allReplicas != 3) continue; + ReplicaCollection<?> replicas = null; // tokens 5, 15 and 25 should go three nodes for (int i = 0; i < 3; i++) { - endpoints = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(i))); - assertEquals(3, endpoints.size()); - assertTrue(endpoints.contains(hosts.get(i+1))); - assertTrue(endpoints.contains(hosts.get(i+2))); - assertTrue(endpoints.contains(hosts.get(i+3))); + replicas = tmd.getWriteEndpoints(keyTokens.get(i), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(i))); + assertEquals(3, replicas.size()); + assertTrue(replicas.endpoints().contains(hosts.get(i + 1))); + assertTrue(replicas.endpoints().contains(hosts.get(i + 2))); + assertTrue(replicas.endpoints().contains(hosts.get(i + 3))); } // token 35 should go to nodes 4, 5, 6 and boot1 - endpoints = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(3))); - assertEquals(4, endpoints.size()); - assertTrue(endpoints.contains(hosts.get(4))); - assertTrue(endpoints.contains(hosts.get(5))); - assertTrue(endpoints.contains(hosts.get(6))); - assertTrue(endpoints.contains(boot1)); + replicas = tmd.getWriteEndpoints(keyTokens.get(3), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(3))); + assertEquals(4, replicas.size()); + assertTrue(replicas.endpoints().contains(hosts.get(4))); + assertTrue(replicas.endpoints().contains(hosts.get(5))); + assertTrue(replicas.endpoints().contains(hosts.get(6))); + assertTrue(replicas.endpoints().contains(boot1)); // token 45 should go to nodes 5, 6, 7 boot1 - endpoints = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(4))); - assertEquals(4, endpoints.size()); - assertTrue(endpoints.contains(hosts.get(5))); - assertTrue(endpoints.contains(hosts.get(6))); - assertTrue(endpoints.contains(hosts.get(7))); - assertTrue(endpoints.contains(boot1)); + replicas = tmd.getWriteEndpoints(keyTokens.get(4), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(4))); + assertEquals(4, replicas.size()); + assertTrue(replicas.endpoints().contains(hosts.get(5))); + assertTrue(replicas.endpoints().contains(hosts.get(6))); + assertTrue(replicas.endpoints().contains(hosts.get(7))); + assertTrue(replicas.endpoints().contains(boot1)); // token 55 should go to nodes 6, 7, 8 boot1 and boot2 - endpoints = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(5))); - assertEquals(5, endpoints.size()); - assertTrue(endpoints.contains(hosts.get(6))); - assertTrue(endpoints.contains(hosts.get(7))); - assertTrue(endpoints.contains(hosts.get(8))); - assertTrue(endpoints.contains(boot1)); - assertTrue(endpoints.contains(boot2)); + replicas = tmd.getWriteEndpoints(keyTokens.get(5), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(5))); + assertEquals(5, replicas.size()); + assertTrue(replicas.endpoints().contains(hosts.get(6))); + assertTrue(replicas.endpoints().contains(hosts.get(7))); + assertTrue(replicas.endpoints().contains(hosts.get(8))); + assertTrue(replicas.endpoints().contains(boot1)); + assertTrue(replicas.endpoints().contains(boot2)); // token 65 should go to nodes 6, 7, 8 and boot2 - endpoints = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(6))); - assertEquals(4, endpoints.size()); - assertTrue(endpoints.contains(hosts.get(6))); - assertTrue(endpoints.contains(hosts.get(7))); - assertTrue(endpoints.contains(hosts.get(8))); - assertTrue(endpoints.contains(boot2)); + replicas = tmd.getWriteEndpoints(keyTokens.get(6), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(6))); + assertEquals(4, replicas.size()); + assertTrue(replicas.endpoints().contains(hosts.get(6))); + assertTrue(replicas.endpoints().contains(hosts.get(7))); + assertTrue(replicas.endpoints().contains(hosts.get(8))); + assertTrue(replicas.endpoints().contains(boot2)); // token 75 should to go nodes 8, 9, 0 and boot2 - endpoints = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(7))); - assertEquals(4, endpoints.size()); - assertTrue(endpoints.contains(hosts.get(8))); - assertTrue(endpoints.contains(hosts.get(9))); - assertTrue(endpoints.contains(hosts.get(0))); - assertTrue(endpoints.contains(boot2)); + replicas = tmd.getWriteEndpoints(keyTokens.get(7), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(7))); + assertEquals(4, replicas.size()); + assertTrue(replicas.endpoints().contains(hosts.get(8))); + assertTrue(replicas.endpoints().contains(hosts.get(9))); + assertTrue(replicas.endpoints().contains(hosts.get(0))); + assertTrue(replicas.endpoints().contains(boot2)); // token 85 should go to nodes 8, 9 and 0 - endpoints = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(8))); - assertEquals(3, endpoints.size()); - assertTrue(endpoints.contains(hosts.get(8))); - assertTrue(endpoints.contains(hosts.get(9))); - assertTrue(endpoints.contains(hosts.get(0))); + replicas = tmd.getWriteEndpoints(keyTokens.get(8), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(8))); + assertEquals(3, replicas.size()); + assertTrue(replicas.endpoints().contains(hosts.get(8))); + assertTrue(replicas.endpoints().contains(hosts.get(9))); + assertTrue(replicas.endpoints().contains(hosts.get(0))); // token 95 should go to nodes 9, 0 and 1 - endpoints = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, strategy.getNaturalEndpoints(keyTokens.get(9))); - assertEquals(3, endpoints.size()); - assertTrue(endpoints.contains(hosts.get(9))); - assertTrue(endpoints.contains(hosts.get(0))); - assertTrue(endpoints.contains(hosts.get(1))); + replicas = tmd.getWriteEndpoints(keyTokens.get(9), keyspaceName, strategy.getNaturalReplicasForToken(keyTokens.get(9))); + assertEquals(3, replicas.size()); + assertTrue(replicas.endpoints().contains(hosts.get(9))); + assertTrue(replicas.endpoints().contains(hosts.get(0))); + assertTrue(replicas.endpoints().contains(hosts.get(1))); } // all moving nodes are back to the normal state @@ -1009,6 +959,14 @@ public class MoveTest return addrs; } + private static EndpointsForRange makeReplicas(Range<Token> range, String... hosts) throws UnknownHostException + { + EndpointsForRange.Builder replicas = EndpointsForRange.builder(range, hosts.length); + for (String host : hosts) + replicas.add(Replica.fullReplica(InetAddressAndPort.getByName(host), range)); + return replicas.build(); + } + private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd) { KeyspaceMetadata ksmd = Schema.instance.getKeyspaceMetadata(keyspaceName); @@ -1025,7 +983,7 @@ public class MoveTest return new BigIntegerToken(String.valueOf(10 * position + 7)); } - private int collectionSize(Collection<?> collection) + private static int collectionSize(Collection<?> collection) { if (collection.isEmpty()) return 0; @@ -1057,8 +1015,52 @@ public class MoveTest return ranges; } - private Range<Token> generateRange(int left, int right) + private static Token tk(int v) + { + return new BigIntegerToken(String.valueOf(v)); + } + + private static Range<Token> generateRange(int left, int right) + { + return new Range<Token>(tk(left), tk(right)); + } + + private static Replica replica(InetAddressAndPort endpoint, int left, int right, boolean full) + { + return new Replica(endpoint, tk(left), tk(right), full); + } + + private static InetAddressAndPort inet(String name) { - return new Range<Token>(new BigIntegerToken(String.valueOf(left)), new BigIntegerToken(String.valueOf(right))); + try + { + return InetAddressAndPort.getByName(name); + } + catch (UnknownHostException e) + { + throw new AssertionError(e); + } + } + + private static Replica replica(InetAddressAndPort endpoint, int left, int right) + { + return replica(endpoint, left, right, true); + } + + private static void assertRanges(RangesByEndpoint epReplicas, String endpoint, int... rangePairs) + { + if (rangePairs.length % 2 == 1) + throw new RuntimeException("assertRanges argument count should be even"); + + InetAddressAndPort ep = inet(endpoint); + List<Replica> expected = new ArrayList<>(rangePairs.length/2); + for (int i=0; i<rangePairs.length; i+=2) + expected.add(replica(ep, rangePairs[i], rangePairs[i+1])); + + RangesAtEndpoint actual = epReplicas.get(ep); + assertEquals(expected.size(), actual.size()); + for (Replica replica : expected) + if (!actual.contains(replica)) + assertEquals(RangesAtEndpoint.copyOf(expected), actual); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/MoveTransientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/MoveTransientTest.java b/test/unit/org/apache/cassandra/service/MoveTransientTest.java new file mode 100644 index 0000000..1e24735 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/MoveTransientTest.java @@ -0,0 +1,638 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import com.google.common.base.Predicate; +import org.apache.cassandra.locator.EndpointsByReplica; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.RangesByEndpoint; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.RandomPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.RangeStreamer; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.AbstractEndpointSnitch; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.utils.Pair; + +import static org.apache.cassandra.locator.Replica.fullReplica; +import static org.apache.cassandra.locator.Replica.transientReplica; +import static org.apache.cassandra.service.StorageServiceTest.assertMultimapEqualsIgnoreOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * This is also fairly effectively testing source retrieval for bootstrap as well since RangeStreamer + * is used to calculate the endpoints to fetch from and check they are alive for both RangeRelocator (move) and + * bootstrap (RangeRelocator). + */ +public class MoveTransientTest +{ + private static final Logger logger = LoggerFactory.getLogger(MoveTransientTest.class); + + static InetAddressAndPort aAddress; + static InetAddressAndPort bAddress; + static InetAddressAndPort cAddress; + static InetAddressAndPort dAddress; + static InetAddressAndPort eAddress; + + @BeforeClass + public static void setUpClass() throws Exception + { + aAddress = InetAddressAndPort.getByName("127.0.0.1"); + bAddress = InetAddressAndPort.getByName("127.0.0.2"); + cAddress = InetAddressAndPort.getByName("127.0.0.3"); + dAddress = InetAddressAndPort.getByName("127.0.0.4"); + eAddress = InetAddressAndPort.getByName("127.0.0.5"); + } + + private final List<InetAddressAndPort> downNodes = new ArrayList(); + Predicate<Replica> alivePredicate = replica -> !downNodes.contains(replica.endpoint()); + + private final List<InetAddressAndPort> sourceFilterDownNodes = new ArrayList<>(); + private final Collection<Predicate<Replica>> sourceFilters = Collections.singleton(replica -> !sourceFilterDownNodes.contains(replica.endpoint())); + + @After + public void clearDownNode() + { + downNodes.clear(); + sourceFilterDownNodes.clear(); + } + + @BeforeClass + public static void setupDD() + { + DatabaseDescriptor.daemonInitialization(); + } + + Token oneToken = new RandomPartitioner.BigIntegerToken("1"); + Token twoToken = new RandomPartitioner.BigIntegerToken("2"); + Token threeToken = new RandomPartitioner.BigIntegerToken("3"); + Token fourToken = new RandomPartitioner.BigIntegerToken("4"); + Token sixToken = new RandomPartitioner.BigIntegerToken("6"); + Token sevenToken = new RandomPartitioner.BigIntegerToken("7"); + Token nineToken = new RandomPartitioner.BigIntegerToken("9"); + Token elevenToken = new RandomPartitioner.BigIntegerToken("11"); + Token fourteenToken = new RandomPartitioner.BigIntegerToken("14"); + + Range<Token> aRange = new Range(oneToken, threeToken); + Range<Token> bRange = new Range(threeToken, sixToken); + Range<Token> cRange = new Range(sixToken, nineToken); + Range<Token> dRange = new Range(nineToken, elevenToken); + Range<Token> eRange = new Range(elevenToken, oneToken); + + + RangesAtEndpoint current = RangesAtEndpoint.of(new Replica(aAddress, aRange, true), + new Replica(aAddress, eRange, true), + new Replica(aAddress, dRange, false)); + + + /** + * Ring with start A 1-3 B 3-6 C 6-9 D 9-1 + * A's token moves from 3 to 4. + * <p> + * Result is A gains some range + * + * @throws Exception + */ + @Test + public void testCalculateStreamAndFetchRangesMoveForward() throws Exception + { + calculateStreamAndFetchRangesMoveForward(); + } + + private Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRangesMoveForward() throws Exception + { + Range<Token> aPrimeRange = new Range<>(oneToken, fourToken); + + RangesAtEndpoint updated = RangesAtEndpoint.of( + new Replica(aAddress, aPrimeRange, true), + new Replica(aAddress, eRange, true), + new Replica(aAddress, dRange, false) + ); + + Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated); + assertContentsIgnoreOrder(result.left); + assertContentsIgnoreOrder(result.right, fullReplica(aAddress, threeToken, fourToken)); + return result; + } + + /** + * Ring with start A 1-3 B 3-6 C 6-9 D 9-11 E 11-1 + * A's token moves from 3 to 14 + * <p> + * Result is A loses range and it must be streamed + * + * @throws Exception + */ + @Test + public void testCalculateStreamAndFetchRangesMoveBackwardBetween() throws Exception + { + calculateStreamAndFetchRangesMoveBackwardBetween(); + } + + public Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRangesMoveBackwardBetween() throws Exception + { + Range<Token> aPrimeRange = new Range<>(elevenToken, fourteenToken); + + RangesAtEndpoint updated = RangesAtEndpoint.of( + new Replica(aAddress, aPrimeRange, true), + new Replica(aAddress, dRange, true), + new Replica(aAddress, cRange, false) + ); + + + Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated); + assertContentsIgnoreOrder(result.left, fullReplica(aAddress, oneToken, threeToken), fullReplica(aAddress, fourteenToken, oneToken)); + assertContentsIgnoreOrder(result.right, transientReplica(aAddress, sixToken, nineToken), fullReplica(aAddress, nineToken, elevenToken)); + return result; + } + + /** + * Ring with start A 1-3 B 3-6 C 6-9 D 9-11 E 11-1 + * A's token moves from 3 to 2 + * + * Result is A loses range and it must be streamed + * @throws Exception + */ + @Test + public void testCalculateStreamAndFetchRangesMoveBackward() throws Exception + { + calculateStreamAndFetchRangesMoveBackward(); + } + + private Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRangesMoveBackward() throws Exception + { + Range<Token> aPrimeRange = new Range<>(oneToken, twoToken); + + RangesAtEndpoint updated = RangesAtEndpoint.of( + new Replica(aAddress, aPrimeRange, true), + new Replica(aAddress, eRange, true), + new Replica(aAddress, dRange, false) + ); + + Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated); + + //Moving backwards has no impact on any replica. We already fully replicate counter clockwise + //The transient replica does transiently replicate slightly more, but that is addressed by cleanup + assertContentsIgnoreOrder(result.left, fullReplica(aAddress, twoToken, threeToken)); + assertContentsIgnoreOrder(result.right); + + return result; + } + + /** + * Ring with start A 1-3 B 3-6 C 6-9 D 9-11 E 11-1 + * A's moves from 3 to 7 + * + * @throws Exception + */ + private Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRangesMoveForwardBetween() throws Exception + { + Range<Token> aPrimeRange = new Range<>(sixToken, sevenToken); + Range<Token> bPrimeRange = new Range<>(oneToken, sixToken); + + + RangesAtEndpoint updated = RangesAtEndpoint.of( + new Replica(aAddress, aPrimeRange, true), + new Replica(aAddress, bPrimeRange, true), + new Replica(aAddress, eRange, false) + ); + + Pair<RangesAtEndpoint, RangesAtEndpoint> result = StorageService.calculateStreamAndFetchRanges(current, updated); + + assertContentsIgnoreOrder(result.left, fullReplica(aAddress, elevenToken, oneToken), transientReplica(aAddress, nineToken, elevenToken)); + assertContentsIgnoreOrder(result.right, fullReplica(aAddress, threeToken, sixToken), fullReplica(aAddress, sixToken, sevenToken)); + return result; + } + + /** + * Ring with start A 1-3 B 3-6 C 6-9 D 9-11 E 11-1 + * A's token moves from 3 to 7 + * + * @throws Exception + */ + @Test + public void testCalculateStreamAndFetchRangesMoveForwardBetween() throws Exception + { + calculateStreamAndFetchRangesMoveForwardBetween(); + } + + /** + * Construct the ring state for calculateStreamAndFetchRangesMoveBackwardBetween + * Where are A moves from 3 to 14 + * @return + */ + private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveBackwardBetween() + { + TokenMetadata tmd = new TokenMetadata(); + tmd.updateNormalToken(aRange.right, aAddress); + tmd.updateNormalToken(bRange.right, bAddress); + tmd.updateNormalToken(cRange.right, cAddress); + tmd.updateNormalToken(dRange.right, dAddress); + tmd.updateNormalToken(eRange.right, eAddress); + tmd.addMovingEndpoint(fourteenToken, aAddress); + TokenMetadata updated = tmd.cloneAfterAllSettled(); + + return Pair.create(tmd, updated); + } + + + /** + * Construct the ring state for calculateStreamAndFetchRangesMoveForwardBetween + * Where are A moves from 3 to 7 + * @return + */ + private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveForwardBetween() + { + TokenMetadata tmd = new TokenMetadata(); + tmd.updateNormalToken(aRange.right, aAddress); + tmd.updateNormalToken(bRange.right, bAddress); + tmd.updateNormalToken(cRange.right, cAddress); + tmd.updateNormalToken(dRange.right, dAddress); + tmd.updateNormalToken(eRange.right, eAddress); + tmd.addMovingEndpoint(sevenToken, aAddress); + TokenMetadata updated = tmd.cloneAfterAllSettled(); + + return Pair.create(tmd, updated); + } + + private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveBackward() + { + TokenMetadata tmd = new TokenMetadata(); + tmd.updateNormalToken(aRange.right, aAddress); + tmd.updateNormalToken(bRange.right, bAddress); + tmd.updateNormalToken(cRange.right, cAddress); + tmd.updateNormalToken(dRange.right, dAddress); + tmd.updateNormalToken(eRange.right, eAddress); + tmd.addMovingEndpoint(twoToken, aAddress); + TokenMetadata updated = tmd.cloneAfterAllSettled(); + + return Pair.create(tmd, updated); + } + + private Pair<TokenMetadata, TokenMetadata> constructTMDsMoveForward() + { + TokenMetadata tmd = new TokenMetadata(); + tmd.updateNormalToken(aRange.right, aAddress); + tmd.updateNormalToken(bRange.right, bAddress); + tmd.updateNormalToken(cRange.right, cAddress); + tmd.updateNormalToken(dRange.right, dAddress); + tmd.updateNormalToken(eRange.right, eAddress); + tmd.addMovingEndpoint(fourToken, aAddress); + TokenMetadata updated = tmd.cloneAfterAllSettled(); + + return Pair.create(tmd, updated); + } + + + @Test + public void testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints() throws Exception + { + EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable(); + + InetAddressAndPort cOrB = (downNodes.contains(cAddress) || sourceFilterDownNodes.contains(cAddress)) ? bAddress : cAddress; + + //Need to pull the full replica and the transient replica that is losing the range + expectedResult.put(fullReplica(aAddress, sixToken, sevenToken), fullReplica(dAddress, sixToken, nineToken)); + expectedResult.put(fullReplica(aAddress, sixToken, sevenToken), transientReplica(eAddress, sixToken, nineToken)); + + //Same need both here as well + expectedResult.put(fullReplica(aAddress, threeToken, sixToken), fullReplica(cOrB, threeToken, sixToken)); + expectedResult.put(fullReplica(aAddress, threeToken, sixToken), transientReplica(dAddress, threeToken, sixToken)); + + invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForwardBetween().right, + constructTMDsMoveForwardBetween(), + expectedResult.asImmutableView()); + } + + @Test + public void testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws Exception + { + for (InetAddressAndPort downNode : new InetAddressAndPort[] {dAddress, eAddress}) + { + downNodes.clear(); + downNodes.add(downNode); + boolean threw = false; + try + { + testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints(); + } + catch (IllegalStateException ise) + { + ise.printStackTrace(); + assertTrue(downNode.toString(), + ise.getMessage().startsWith("A node required to move the data consistently is down:") + && ise.getMessage().contains(downNode.toString())); + threw = true; + } + assertTrue("Didn't throw for " + downNode, threw); + } + + //Shouldn't throw because another full replica is available + downNodes.clear(); + downNodes.add(cAddress); + testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints(); + } + + @Test + public void testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter() throws Exception + { + for (InetAddressAndPort downNode : new InetAddressAndPort[] {dAddress, eAddress}) + { + sourceFilterDownNodes.clear(); + sourceFilterDownNodes.add(downNode); + boolean threw = false; + try + { + testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints(); + } + catch (IllegalStateException ise) + { + ise.printStackTrace(); + assertTrue(downNode.toString(), + ise.getMessage().startsWith("Necessary replicas for strict consistency were removed by source filters:") + && ise.getMessage().contains(downNode.toString())); + threw = true; + } + assertTrue("Didn't throw for " + downNode, threw); + } + + //Shouldn't throw because another full replica is available + sourceFilterDownNodes.clear(); + sourceFilterDownNodes.add(cAddress); + testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints(); + } + + @Test + public void testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints() throws Exception + { + EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable(); + + //Need to pull the full replica and the transient replica that is losing the range + expectedResult.put(fullReplica(aAddress, nineToken, elevenToken), fullReplica(eAddress, nineToken, elevenToken)); + expectedResult.put(transientReplica(aAddress, sixToken, nineToken), transientReplica(eAddress, sixToken, nineToken)); + + invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackwardBetween().right, + constructTMDsMoveBackwardBetween(), + expectedResult.asImmutableView()); + + } + + @Test(expected = IllegalStateException.class) + public void testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws Exception + { + //Any replica can be the full replica so this will always fail on the transient range + downNodes.add(eAddress); + testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints(); + } + + @Test(expected = IllegalStateException.class) + public void testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter() throws Exception + { + //Any replica can be the full replica so this will always fail on the transient range + sourceFilterDownNodes.add(eAddress); + testMoveBackwardBetweenCalculateRangesToFetchWithPreferredEndpoints(); + } + + + //There is no down node version of this test because nothing needs to be fetched + @Test + public void testMoveBackwardCalculateRangesToFetchWithPreferredEndpoints() throws Exception + { + //Moving backwards should fetch nothing and fetch ranges is emptys so this doesn't test a ton + EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable(); + + invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackward().right, + constructTMDsMoveBackward(), + expectedResult.asImmutableView()); + + } + + @Test + public void testMoveForwardCalculateRangesToFetchWithPreferredEndpoints() throws Exception + { + EndpointsByReplica.Mutable expectedResult = new EndpointsByReplica.Mutable(); + + InetAddressAndPort cOrBAddress = (downNodes.contains(cAddress) || sourceFilterDownNodes.contains(cAddress)) ? bAddress : cAddress; + + //Need to pull the full replica and the transient replica that is losing the range + expectedResult.put(fullReplica(aAddress, threeToken, fourToken), fullReplica(cOrBAddress, threeToken, sixToken)); + expectedResult.put(fullReplica(aAddress, threeToken, fourToken), transientReplica(dAddress, threeToken, sixToken)); + + invokeCalculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForward().right, + constructTMDsMoveForward(), + expectedResult.asImmutableView()); + + } + + @Test + public void testMoveForwardCalculateRangesToFetchWithPreferredEndpointsDownNodes() throws Exception + { + downNodes.add(dAddress); + boolean threw = false; + try + { + testMoveForwardCalculateRangesToFetchWithPreferredEndpoints(); + } + catch (IllegalStateException ise) + { + ise.printStackTrace(); + assertTrue(dAddress.toString(), + ise.getMessage().startsWith("A node required to move the data consistently is down:") + && ise.getMessage().contains(dAddress.toString())); + threw = true; + } + assertTrue("Didn't throw for " + dAddress, threw); + + //Shouldn't throw because another full replica is available + downNodes.clear(); + downNodes.add(cAddress); + testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints(); + } + + @Test + public void testMoveForwardCalculateRangesToFetchWithPreferredEndpointsDownNodesSourceFilter() throws Exception + { + sourceFilterDownNodes.add(dAddress); + boolean threw = false; + try + { + testMoveForwardCalculateRangesToFetchWithPreferredEndpoints(); + } + catch (IllegalStateException ise) + { + ise.printStackTrace(); + assertTrue(dAddress.toString(), + ise.getMessage().startsWith("Necessary replicas for strict consistency were removed by source filters:") + && ise.getMessage().contains(dAddress.toString())); + threw = true; + } + assertTrue("Didn't throw for " + dAddress, threw); + + //Shouldn't throw because another full replica is available + sourceFilterDownNodes.clear(); + sourceFilterDownNodes.add(cAddress); + testMoveForwardBetweenCalculateRangesToFetchWithPreferredEndpoints(); + } + + private void invokeCalculateRangesToFetchWithPreferredEndpoints(RangesAtEndpoint toFetch, + Pair<TokenMetadata, TokenMetadata> tmds, + EndpointsByReplica expectedResult) + { + DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); + + EndpointsByReplica result = RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> replicas.sorted((a, b) -> b.endpoint().compareTo(a.endpoint())), + simpleStrategy(tmds.left), + toFetch, + true, + tmds.left, + tmds.right, + alivePredicate, + "OldNetworkTopologyStrategyTest", + sourceFilters); + logger.info("Ranges to fetch with preferred endpoints"); + logger.info(result.toString()); + assertMultimapEqualsIgnoreOrder(expectedResult, result); + + } + + private AbstractReplicationStrategy simpleStrategy(TokenMetadata tmd) + { + IEndpointSnitch snitch = new AbstractEndpointSnitch() + { + public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2) + { + return 0; + } + + public String getRack(InetAddressAndPort endpoint) + { + return "R1"; + } + + public String getDatacenter(InetAddressAndPort endpoint) + { + return "DC1"; + } + }; + + return new SimpleStrategy("MoveTransientTest", + tmd, + snitch, + com.google.common.collect.ImmutableMap.of("replication_factor", "3/1")); + } + + @Test + public void testMoveForwardBetweenCalculateRangesToStreamWithPreferredEndpoints() throws Exception + { + DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); + RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable(); + + //Need to pull the full replica and the transient replica that is losing the range + expectedResult.put(bAddress, transientReplica(bAddress, nineToken, elevenToken)); + expectedResult.put(bAddress, fullReplica(bAddress, elevenToken, oneToken)); + + invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForwardBetween().left, + constructTMDsMoveForwardBetween(), + expectedResult.asImmutableView()); + } + + @Test + public void testMoveBackwardBetweenCalculateRangesToStreamWithPreferredEndpoints() throws Exception + { + RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable(); + + expectedResult.put(bAddress, fullReplica(bAddress, fourteenToken, oneToken)); + + expectedResult.put(dAddress, transientReplica(dAddress, oneToken, threeToken)); + + expectedResult.put(cAddress, fullReplica(cAddress, oneToken, threeToken)); + expectedResult.put(cAddress, transientReplica(cAddress, fourteenToken, oneToken)); + + invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackwardBetween().left, + constructTMDsMoveBackwardBetween(), + expectedResult.asImmutableView()); + } + + @Test + public void testMoveBackwardCalculateRangesToStreamWithPreferredEndpoints() throws Exception + { + RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable(); + expectedResult.put(cAddress, fullReplica(cAddress, twoToken, threeToken)); + expectedResult.put(dAddress, transientReplica(dAddress, twoToken, threeToken)); + + invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveBackward().left, + constructTMDsMoveBackward(), + expectedResult.asImmutableView()); + } + + @Test + public void testMoveForwardCalculateRangesToStreamWithPreferredEndpoints() throws Exception + { + //Nothing to stream moving forward because we are acquiring more range not losing range + RangesByEndpoint.Mutable expectedResult = new RangesByEndpoint.Mutable(); + + invokeCalculateRangesToStreamWithPreferredEndpoints(calculateStreamAndFetchRangesMoveForward().left, + constructTMDsMoveForward(), + expectedResult.asImmutableView()); + } + + private void invokeCalculateRangesToStreamWithPreferredEndpoints(RangesAtEndpoint toStream, + Pair<TokenMetadata, TokenMetadata> tmds, + RangesByEndpoint expectedResult) + { + DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); + StorageService.RangeRelocator relocator = new StorageService.RangeRelocator(); + RangesByEndpoint result = relocator.calculateRangesToStreamWithEndpoints(toStream, + simpleStrategy(tmds.left), + tmds.left, + tmds.right); + logger.info("Ranges to stream by endpoint"); + logger.info(result.toString()); + assertMultimapEqualsIgnoreOrder(expectedResult, result); + } + + private static void assertContentsIgnoreOrder(RangesAtEndpoint ranges, Replica ... replicas) + { + assertEquals(ranges.size(), replicas.length); + for (Replica replica : replicas) + if (!ranges.contains(replica)) + assertEquals(RangesAtEndpoint.of(replicas), ranges); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org