Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java?rev=906521&r1=906520&r2=906521&view=diff ============================================================================== --- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java (original) +++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java Thu Feb 4 15:21:31 2010 @@ -24,8 +24,13 @@ import java.net.InetAddress; import java.net.UnknownHostException; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.commons.lang.StringUtils; import org.junit.Test; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import org.apache.cassandra.dht.IPartitioner; @@ -40,6 +45,16 @@ public class MoveTest { + // handy way of creating a mapping of strategies to use in StorageService. + private static Map<String, AbstractReplicationStrategy> createReplacements(AbstractReplicationStrategy strat) + { + Map<String, AbstractReplicationStrategy> replacements = new HashMap<String, AbstractReplicationStrategy>(); + for (String table : DatabaseDescriptor.getTables()) + replacements.put(table, strat); + return replacements; + } + + /** * Test whether write endpoints is correct when the node is leaving. Uses * StorageService.onChange and does not manipulate token metadata directly. @@ -48,53 +63,74 @@ public void testWriteEndPointsDuringLeave() throws UnknownHostException { StorageService ss = StorageService.instance; + final int RING_SIZE = 5; + final int LEAVING_NODE = 2; TokenMetadata tmd = ss.getTokenMetadata(); tmd.clearUnsafe(); IPartitioner partitioner = new RandomPartitioner(); - AbstractReplicationStrategy testStrategy = new RackUnawareStrategy(tmd, 3); + AbstractReplicationStrategy testStrategy = new RackUnawareStrategy(tmd, null); IPartitioner oldPartitioner = ss.setPartitionerUnsafe(partitioner); - AbstractReplicationStrategy oldStrategy = ss.setReplicationStrategyUnsafe(testStrategy); + Map<String, AbstractReplicationStrategy> oldStrategies = ss.setReplicationStrategyUnsafe(createReplacements(testStrategy)); ArrayList<Token> endPointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); List<InetAddress> hosts = new ArrayList<InetAddress>(); - createInitialRing(ss, partitioner, endPointTokens, keyTokens, hosts, 5); + createInitialRing(ss, partitioner, endPointTokens, keyTokens, hosts, RING_SIZE); + final Map<String, List<Range>> deadNodesRanges = new HashMap<String, List<Range>>(); + for (String table : DatabaseDescriptor.getNonSystemTables()) + { + List<Range> list = new ArrayList<Range>(); + list.addAll(testStrategy.getAddressRanges(table).get(hosts.get(LEAVING_NODE))); + Collections.sort(list); + deadNodesRanges.put(table, list); + } + // Third node leaves - ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endPointTokens.get(2)))); + ss.onChange(hosts.get(LEAVING_NODE), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endPointTokens.get(LEAVING_NODE)))); // check that it is correctly marked as leaving in tmd - assertTrue(tmd.isLeaving(hosts.get(2))); + assertTrue(tmd.isLeaving(hosts.get(LEAVING_NODE))); // check that pending ranges are correct (primary range should go to 1st node, first // replica range to 4th node and 2nd replica range to 5th node) - assertTrue(tmd.getPendingRanges(hosts.get(0)).get(0).equals(new Range(endPointTokens.get(1), - endPointTokens.get(2)))); - assertTrue(tmd.getPendingRanges(hosts.get(3)).get(0).equals(new Range(endPointTokens.get(4), - endPointTokens.get(0)))); - assertTrue(tmd.getPendingRanges(hosts.get(4)).get(0).equals(new Range(endPointTokens.get(0), - endPointTokens.get(1)))); - - for (int i=0; i<keyTokens.size(); ++i) + for (String table : DatabaseDescriptor.getNonSystemTables()) { - Collection<InetAddress> endPoints = testStrategy.getWriteEndpoints(keyTokens.get(i), testStrategy.getNaturalEndpoints(keyTokens.get(i))); + int replicationFactor = DatabaseDescriptor.getReplicationFactor(table); - // Original third node does not store replicas for 4th and 5th node (ranges 20-30 - // and 30-40 respectively), so their write endpoints count should be still 3. The - // third node stores data for ranges 40-0, 0-10 and 10-20, so writes falling to - // these ranges should have four endpoints now. keyTokens[2] is 25 and keyTokens[3] - // is 35, so these are the ones that should have 3 endpoints. - if (i==2 || i==3) - assertTrue(endPoints.size() == 3); - else - assertTrue(endPoints.size() == 4); + // if the ring minus the leaving node leaves us with less than RF, we're hosed. + if (hosts.size()-1 < replicationFactor) + continue; + + // verify that the replicationFactor nodes after the leaving node are gatherings it's pending ranges. + // in the case where rf==5, we're screwed because we basically just lost data. + for (int i = 0; i < replicationFactor; i++) + { + assertTrue(tmd.getPendingRanges(table, hosts.get((LEAVING_NODE + 1 + i) % RING_SIZE)).size() > 0); + assertEquals(tmd.getPendingRanges(table, hosts.get((LEAVING_NODE + 1 + i) % RING_SIZE)).get(0), deadNodesRanges.get(table).get(i)); + } + + // note that we're iterating over nodes and sample tokens. + final int replicaStart = (LEAVING_NODE-replicationFactor+RING_SIZE)%RING_SIZE; + for (int i=0; i<keyTokens.size(); ++i) + { + Collection<InetAddress> endPoints = testStrategy.getWriteEndpoints(keyTokens.get(i), table, testStrategy.getNaturalEndpoints(keyTokens.get(i), table)); + // figure out if this node is one of the nodes previous to the failed node (2). + boolean isReplica = (i - replicaStart + RING_SIZE) % RING_SIZE < replicationFactor; + // the preceeding leaving_node-replication_factor nodes should have and additional ep (replication_factor+1); + if (isReplica) + assertTrue(endPoints.size() == replicationFactor + 1); + else + assertTrue(endPoints.size() == replicationFactor); + + } } ss.setPartitionerUnsafe(oldPartitioner); - ss.setReplicationStrategyUnsafe(oldStrategy); + ss.setReplicationStrategyUnsafe(oldStrategies); } /** @@ -105,25 +141,26 @@ public void testSimultaneousMove() throws UnknownHostException { StorageService ss = StorageService.instance; + final int RING_SIZE = 10; TokenMetadata tmd = ss.getTokenMetadata(); tmd.clearUnsafe(); IPartitioner partitioner = new RandomPartitioner(); - AbstractReplicationStrategy testStrategy = new RackUnawareStrategy(tmd, 3); + AbstractReplicationStrategy testStrategy = new RackUnawareStrategy(tmd, null); IPartitioner oldPartitioner = ss.setPartitionerUnsafe(partitioner); - AbstractReplicationStrategy oldStrategy = ss.setReplicationStrategyUnsafe(testStrategy); + Map<String, AbstractReplicationStrategy> oldStrategy = ss.setReplicationStrategyUnsafe(createReplacements(testStrategy)); ArrayList<Token> endPointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); List<InetAddress> hosts = new ArrayList<InetAddress>(); // create a ring or 10 nodes - createInitialRing(ss, partitioner, endPointTokens, keyTokens, hosts, 10); + createInitialRing(ss, partitioner, endPointTokens, keyTokens, hosts, RING_SIZE); // nodes 6, 8 and 9 leave - ss.onChange(hosts.get(6), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endPointTokens.get(6)))); - ss.onChange(hosts.get(8), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endPointTokens.get(8)))); - ss.onChange(hosts.get(9), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endPointTokens.get(9)))); + final int[] LEAVING = new int[] { 6, 8, 9}; + for (int leaving : LEAVING) + ss.onChange(hosts.get(leaving), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter + partitioner.getTokenFactory().toString(endPointTokens.get(leaving)))); // boot two new nodes with keyTokens.get(5) and keyTokens.get(7) InetAddress boot1 = InetAddress.getByName("127.0.1.1"); @@ -133,151 +170,245 @@ Collection<InetAddress> endPoints = null; - // tokens 5, 15 and 25 should go three nodes - for (int i=0; i<3; ++i) + // pre-calculate the results. + Map<String, Multimap<Token, InetAddress>> expectedEndpoints = new HashMap<String, Multimap<Token, InetAddress>>(); + expectedEndpoints.put("Keyspace1", HashMultimap.<Token, InetAddress>create()); + expectedEndpoints.get("Keyspace1").putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2")); + expectedEndpoints.get("Keyspace1").putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3")); + expectedEndpoints.get("Keyspace1").putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4")); + expectedEndpoints.get("Keyspace1").putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5")); + expectedEndpoints.get("Keyspace1").putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6")); + expectedEndpoints.get("Keyspace1").putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.1.1")); + expectedEndpoints.get("Keyspace1").putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.8")); + expectedEndpoints.get("Keyspace1").putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.1.2", "127.0.0.1")); + expectedEndpoints.get("Keyspace1").putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.10", "127.0.0.1")); + expectedEndpoints.get("Keyspace1").putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.1")); + expectedEndpoints.put("Keyspace2", HashMultimap.<Token, InetAddress>create()); + expectedEndpoints.get("Keyspace2").putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2")); + expectedEndpoints.get("Keyspace2").putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3")); + expectedEndpoints.get("Keyspace2").putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4")); + expectedEndpoints.get("Keyspace2").putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5")); + expectedEndpoints.get("Keyspace2").putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6")); + expectedEndpoints.get("Keyspace2").putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.1.1")); + expectedEndpoints.get("Keyspace2").putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.8")); + expectedEndpoints.get("Keyspace2").putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.1.2", "127.0.0.1")); + expectedEndpoints.get("Keyspace2").putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.10", "127.0.0.1")); + expectedEndpoints.get("Keyspace2").putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.1")); + expectedEndpoints.put("Keyspace3", HashMultimap.<Token, InetAddress>create()); + expectedEndpoints.get("Keyspace3").putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5", "127.0.0.6")); + expectedEndpoints.get("Keyspace3").putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3", "127.0.0.4", "127.0.0.5", "127.0.0.6", "127.0.0.7", "127.0.1.1", "127.0.0.8")); + expectedEndpoints.get("Keyspace3").putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4", "127.0.0.5", "127.0.0.6", "127.0.0.7", "127.0.0.8", "127.0.1.2", "127.0.0.1", "127.0.1.1")); + expectedEndpoints.get("Keyspace3").putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5", "127.0.0.6", "127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.1.2", "127.0.0.1", "127.0.0.2", "127.0.1.1")); + expectedEndpoints.get("Keyspace3").putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6", "127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.0.10", "127.0.1.2", "127.0.0.1", "127.0.0.2", "127.0.1.1", "127.0.0.3")); + expectedEndpoints.get("Keyspace3").putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.1.1", "127.0.1.2")); + expectedEndpoints.get("Keyspace3").putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.8", "127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.1.2", "127.0.0.3", "127.0.0.4")); + expectedEndpoints.get("Keyspace3").putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.1.2", "127.0.0.4", "127.0.0.5")); + expectedEndpoints.get("Keyspace3").putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5")); + expectedEndpoints.get("Keyspace3").putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5")); + expectedEndpoints.put("Keyspace4", HashMultimap.<Token, InetAddress>create()); + expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2", "127.0.0.3", "127.0.0.4")); + expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3", "127.0.0.4", "127.0.0.5")); + expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4", "127.0.0.5", "127.0.0.6")); + expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5", "127.0.0.6", "127.0.0.7", "127.0.1.1", "127.0.0.8")); + expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6", "127.0.0.7", "127.0.0.8", "127.0.1.2", "127.0.0.1", "127.0.1.1")); + expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.0.1", "127.0.0.2", "127.0.1.1", "127.0.1.2")); + expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.8", "127.0.0.9", "127.0.0.10", "127.0.1.2", "127.0.0.1", "127.0.0.2")); + expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.1.2", "127.0.0.2", "127.0.0.3")); + expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3")); + expectedEndpoints.get("Keyspace4").putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.1", "127.0.0.2", "127.0.0.3")); + + for (String table : DatabaseDescriptor.getNonSystemTables()) { - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(i), testStrategy.getNaturalEndpoints(keyTokens.get(i))); + for (int i = 0; i < keyTokens.size(); i++) + { + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(i), table, testStrategy.getNaturalEndpoints(keyTokens.get(i), table)); + assertTrue(expectedEndpoints.get(table).get(keyTokens.get(i)).size() == endPoints.size()); + assertTrue(expectedEndpoints.get(table).get(keyTokens.get(i)).containsAll(endPoints)); + } + + // just to be sure that things still work according to the old tests, run them: + if (DatabaseDescriptor.getReplicationFactor(table) != 3) + continue; + // tokens 5, 15 and 25 should go three nodes + for (int i=0; i<3; ++i) + { + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(i), table, testStrategy.getNaturalEndpoints(keyTokens.get(i), table)); + assertTrue(endPoints.size() == 3); + assertTrue(endPoints.contains(hosts.get(i+1))); + assertTrue(endPoints.contains(hosts.get(i+2))); + assertTrue(endPoints.contains(hosts.get(i+3))); + } + + // token 35 should go to nodes 4, 5, 6, 7 and boot1 + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(3), table, testStrategy.getNaturalEndpoints(keyTokens.get(3), table)); + assertTrue(endPoints.size() == 5); + assertTrue(endPoints.contains(hosts.get(4))); + assertTrue(endPoints.contains(hosts.get(5))); + assertTrue(endPoints.contains(hosts.get(6))); + assertTrue(endPoints.contains(hosts.get(7))); + assertTrue(endPoints.contains(boot1)); + + // token 45 should go to nodes 5, 6, 7, 0, boot1 and boot2 + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(4), table, testStrategy.getNaturalEndpoints(keyTokens.get(4), table)); + assertTrue(endPoints.size() == 6); + assertTrue(endPoints.contains(hosts.get(5))); + assertTrue(endPoints.contains(hosts.get(6))); + assertTrue(endPoints.contains(hosts.get(7))); + assertTrue(endPoints.contains(hosts.get(0))); + assertTrue(endPoints.contains(boot1)); + assertTrue(endPoints.contains(boot2)); + + // token 55 should go to nodes 6, 7, 8, 0, 1, boot1 and boot2 + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(5), table, testStrategy.getNaturalEndpoints(keyTokens.get(5), table)); + assertTrue(endPoints.size() == 7); + assertTrue(endPoints.contains(hosts.get(6))); + assertTrue(endPoints.contains(hosts.get(7))); + assertTrue(endPoints.contains(hosts.get(8))); + assertTrue(endPoints.contains(hosts.get(0))); + assertTrue(endPoints.contains(hosts.get(1))); + assertTrue(endPoints.contains(boot1)); + assertTrue(endPoints.contains(boot2)); + + // token 65 should go to nodes 7, 8, 9, 0, 1 and boot2 + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(6), table, testStrategy.getNaturalEndpoints(keyTokens.get(6), table)); + assertTrue(endPoints.size() == 6); + assertTrue(endPoints.contains(hosts.get(7))); + assertTrue(endPoints.contains(hosts.get(8))); + assertTrue(endPoints.contains(hosts.get(9))); + assertTrue(endPoints.contains(hosts.get(0))); + assertTrue(endPoints.contains(hosts.get(1))); + assertTrue(endPoints.contains(boot2)); + + // token 75 should to go nodes 8, 9, 0, 1, 2 and boot2 + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(7), table, testStrategy.getNaturalEndpoints(keyTokens.get(7), table)); + assertTrue(endPoints.size() == 6); + assertTrue(endPoints.contains(hosts.get(8))); + assertTrue(endPoints.contains(hosts.get(9))); + assertTrue(endPoints.contains(hosts.get(0))); + assertTrue(endPoints.contains(hosts.get(1))); + assertTrue(endPoints.contains(hosts.get(2))); + assertTrue(endPoints.contains(boot2)); + + // token 85 should go to nodes 9, 0, 1 and 2 + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(8), table, testStrategy.getNaturalEndpoints(keyTokens.get(8), table)); + assertTrue(endPoints.size() == 4); + assertTrue(endPoints.contains(hosts.get(9))); + assertTrue(endPoints.contains(hosts.get(0))); + assertTrue(endPoints.contains(hosts.get(1))); + assertTrue(endPoints.contains(hosts.get(2))); + + // token 95 should go to nodes 0, 1 and 2 + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(9), table, testStrategy.getNaturalEndpoints(keyTokens.get(9), table)); assertTrue(endPoints.size() == 3); - assertTrue(endPoints.contains(hosts.get(i+1))); - assertTrue(endPoints.contains(hosts.get(i+2))); - assertTrue(endPoints.contains(hosts.get(i+3))); - } + assertTrue(endPoints.contains(hosts.get(0))); + assertTrue(endPoints.contains(hosts.get(1))); + assertTrue(endPoints.contains(hosts.get(2))); - // token 35 should go to nodes 4, 5, 6, 7 and boot1 - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(3), testStrategy.getNaturalEndpoints(keyTokens.get(3))); - assertTrue(endPoints.size() == 5); - assertTrue(endPoints.contains(hosts.get(4))); - assertTrue(endPoints.contains(hosts.get(5))); - assertTrue(endPoints.contains(hosts.get(6))); - assertTrue(endPoints.contains(hosts.get(7))); - assertTrue(endPoints.contains(boot1)); - - // token 45 should go to nodes 5, 6, 7, 0, boot1 and boot2 - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(4), testStrategy.getNaturalEndpoints(keyTokens.get(4))); - assertTrue(endPoints.size() == 6); - assertTrue(endPoints.contains(hosts.get(5))); - assertTrue(endPoints.contains(hosts.get(6))); - assertTrue(endPoints.contains(hosts.get(7))); - assertTrue(endPoints.contains(hosts.get(0))); - assertTrue(endPoints.contains(boot1)); - assertTrue(endPoints.contains(boot2)); - - // token 55 should go to nodes 6, 7, 8, 0, 1, boot1 and boot2 - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(5), testStrategy.getNaturalEndpoints(keyTokens.get(5))); - assertTrue(endPoints.size() == 7); - assertTrue(endPoints.contains(hosts.get(6))); - assertTrue(endPoints.contains(hosts.get(7))); - assertTrue(endPoints.contains(hosts.get(8))); - assertTrue(endPoints.contains(hosts.get(0))); - assertTrue(endPoints.contains(hosts.get(1))); - assertTrue(endPoints.contains(boot1)); - assertTrue(endPoints.contains(boot2)); - - // token 65 should go to nodes 7, 8, 9, 0, 1 and boot2 - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(6), testStrategy.getNaturalEndpoints(keyTokens.get(6))); - assertTrue(endPoints.size() == 6); - assertTrue(endPoints.contains(hosts.get(7))); - assertTrue(endPoints.contains(hosts.get(8))); - assertTrue(endPoints.contains(hosts.get(9))); - assertTrue(endPoints.contains(hosts.get(0))); - assertTrue(endPoints.contains(hosts.get(1))); - assertTrue(endPoints.contains(boot2)); - - // token 75 should to go nodes 8, 9, 0, 1, 2 and boot2 - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(7), testStrategy.getNaturalEndpoints(keyTokens.get(7))); - assertTrue(endPoints.size() == 6); - assertTrue(endPoints.contains(hosts.get(8))); - assertTrue(endPoints.contains(hosts.get(9))); - assertTrue(endPoints.contains(hosts.get(0))); - assertTrue(endPoints.contains(hosts.get(1))); - assertTrue(endPoints.contains(hosts.get(2))); - assertTrue(endPoints.contains(boot2)); - - // token 85 should go to nodes 9, 0, 1 and 2 - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(8), testStrategy.getNaturalEndpoints(keyTokens.get(8))); - assertTrue(endPoints.size() == 4); - assertTrue(endPoints.contains(hosts.get(9))); - assertTrue(endPoints.contains(hosts.get(0))); - assertTrue(endPoints.contains(hosts.get(1))); - assertTrue(endPoints.contains(hosts.get(2))); - - // token 95 should go to nodes 0, 1 and 2 - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(9), testStrategy.getNaturalEndpoints(keyTokens.get(9))); - assertTrue(endPoints.size() == 3); - assertTrue(endPoints.contains(hosts.get(0))); - assertTrue(endPoints.contains(hosts.get(1))); - assertTrue(endPoints.contains(hosts.get(2))); + } // Now finish node 6 and node 9 leaving, as well as boot1 (after this node 8 is still // leaving and boot2 in progress - ss.onChange(hosts.get(6), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter + StorageService.LEFT_NORMALLY + StorageService.Delimiter + partitioner.getTokenFactory().toString(endPointTokens.get(6)))); - ss.onChange(hosts.get(9), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter + StorageService.LEFT_NORMALLY + StorageService.Delimiter + partitioner.getTokenFactory().toString(endPointTokens.get(9)))); + ss.onChange(hosts.get(LEAVING[0]), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter + StorageService.LEFT_NORMALLY + StorageService.Delimiter + partitioner.getTokenFactory().toString(endPointTokens.get(LEAVING[0])))); + ss.onChange(hosts.get(LEAVING[2]), StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter + StorageService.LEFT_NORMALLY + StorageService.Delimiter + partitioner.getTokenFactory().toString(endPointTokens.get(LEAVING[2])))); ss.onChange(boot1, StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_NORMAL + StorageService.Delimiter + partitioner.getTokenFactory().toString(keyTokens.get(5)))); - // tokens 5, 15 and 25 should go three nodes - for (int i=0; i<3; ++i) + // adjust precalcuated results. this changes what the epected endpoints are. + expectedEndpoints.get("Keyspace1").get(new BigIntegerToken("55")).removeAll(makeAddrs("127.0.0.7", "127.0.0.8")); + expectedEndpoints.get("Keyspace1").get(new BigIntegerToken("85")).removeAll(makeAddrs("127.0.0.10")); + expectedEndpoints.get("Keyspace2").get(new BigIntegerToken("55")).removeAll(makeAddrs("127.0.0.7", "127.0.0.8")); + expectedEndpoints.get("Keyspace2").get(new BigIntegerToken("85")).removeAll(makeAddrs("127.0.0.10")); + expectedEndpoints.get("Keyspace3").get(new BigIntegerToken("15")).removeAll(makeAddrs("127.0.0.7", "127.0.0.8")); + expectedEndpoints.get("Keyspace3").get(new BigIntegerToken("25")).removeAll(makeAddrs("127.0.0.7", "127.0.1.2", "127.0.0.1")); + expectedEndpoints.get("Keyspace3").get(new BigIntegerToken("35")).removeAll(makeAddrs("127.0.0.7", "127.0.0.2")); + expectedEndpoints.get("Keyspace3").get(new BigIntegerToken("45")).removeAll(makeAddrs("127.0.0.7", "127.0.0.10", "127.0.0.3")); + expectedEndpoints.get("Keyspace3").get(new BigIntegerToken("55")).removeAll(makeAddrs("127.0.0.7", "127.0.0.10", "127.0.0.4")); + expectedEndpoints.get("Keyspace3").get(new BigIntegerToken("65")).removeAll(makeAddrs("127.0.0.10")); + expectedEndpoints.get("Keyspace3").get(new BigIntegerToken("75")).removeAll(makeAddrs("127.0.0.10")); + expectedEndpoints.get("Keyspace3").get(new BigIntegerToken("85")).removeAll(makeAddrs("127.0.0.10")); + expectedEndpoints.get("Keyspace4").get(new BigIntegerToken("35")).removeAll(makeAddrs("127.0.0.7", "127.0.0.8")); + expectedEndpoints.get("Keyspace4").get(new BigIntegerToken("45")).removeAll(makeAddrs("127.0.0.7", "127.0.1.2", "127.0.0.1")); + expectedEndpoints.get("Keyspace4").get(new BigIntegerToken("55")).removeAll(makeAddrs("127.0.0.2", "127.0.0.7")); + expectedEndpoints.get("Keyspace4").get(new BigIntegerToken("65")).removeAll(makeAddrs("127.0.0.10")); + expectedEndpoints.get("Keyspace4").get(new BigIntegerToken("75")).removeAll(makeAddrs("127.0.0.10")); + expectedEndpoints.get("Keyspace4").get(new BigIntegerToken("85")).removeAll(makeAddrs("127.0.0.10")); + + for (String table : DatabaseDescriptor.getNonSystemTables()) { - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(i), testStrategy.getNaturalEndpoints(keyTokens.get(i))); + for (int i = 0; i < keyTokens.size(); i++) + { + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(i), table, testStrategy.getNaturalEndpoints(keyTokens.get(i), table)); + assertTrue(expectedEndpoints.get(table).get(keyTokens.get(i)).size() == endPoints.size()); + assertTrue(expectedEndpoints.get(table).get(keyTokens.get(i)).containsAll(endPoints)); + } + + if (DatabaseDescriptor.getReplicationFactor(table) != 3) + continue; + // leave this stuff in to guarantee the old tests work the way they were supposed to. + // tokens 5, 15 and 25 should go three nodes + for (int i=0; i<3; ++i) + { + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(i), table, testStrategy.getNaturalEndpoints(keyTokens.get(i), table)); + assertTrue(endPoints.size() == 3); + assertTrue(endPoints.contains(hosts.get(i+1))); + assertTrue(endPoints.contains(hosts.get(i+2))); + assertTrue(endPoints.contains(hosts.get(i+3))); + } + + // token 35 goes to nodes 4, 5 and boot1 + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(3), table, testStrategy.getNaturalEndpoints(keyTokens.get(3), table)); assertTrue(endPoints.size() == 3); - assertTrue(endPoints.contains(hosts.get(i+1))); - assertTrue(endPoints.contains(hosts.get(i+2))); - assertTrue(endPoints.contains(hosts.get(i+3))); - } + assertTrue(endPoints.contains(hosts.get(4))); + assertTrue(endPoints.contains(hosts.get(5))); + assertTrue(endPoints.contains(boot1)); - // token 35 goes to nodes 4, 5 and boot1 - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(3), testStrategy.getNaturalEndpoints(keyTokens.get(3))); - assertTrue(endPoints.size() == 3); - assertTrue(endPoints.contains(hosts.get(4))); - assertTrue(endPoints.contains(hosts.get(5))); - assertTrue(endPoints.contains(boot1)); - - // token 45 goes to nodes 5, boot1 and node7 - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(4), testStrategy.getNaturalEndpoints(keyTokens.get(4))); - assertTrue(endPoints.size() == 3); - assertTrue(endPoints.contains(hosts.get(5))); - assertTrue(endPoints.contains(boot1)); - assertTrue(endPoints.contains(hosts.get(7))); - - // token 55 goes to boot1, 7, boot2, 8 and 0 - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(5), testStrategy.getNaturalEndpoints(keyTokens.get(5))); - assertTrue(endPoints.size() == 5); - assertTrue(endPoints.contains(boot1)); - assertTrue(endPoints.contains(hosts.get(7))); - assertTrue(endPoints.contains(boot2)); - assertTrue(endPoints.contains(hosts.get(8))); - assertTrue(endPoints.contains(hosts.get(0))); - - // token 65 goes to nodes 7, boot2, 8, 0 and 1 - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(6), testStrategy.getNaturalEndpoints(keyTokens.get(6))); - assertTrue(endPoints.size() == 5); - assertTrue(endPoints.contains(hosts.get(7))); - assertTrue(endPoints.contains(boot2)); - assertTrue(endPoints.contains(hosts.get(8))); - assertTrue(endPoints.contains(hosts.get(0))); - assertTrue(endPoints.contains(hosts.get(1))); - - // token 75 goes to nodes boot2, 8, 0, 1 and 2 - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(7), testStrategy.getNaturalEndpoints(keyTokens.get(7))); - assertTrue(endPoints.size() == 5); - assertTrue(endPoints.contains(boot2)); - assertTrue(endPoints.contains(hosts.get(8))); - assertTrue(endPoints.contains(hosts.get(0))); - assertTrue(endPoints.contains(hosts.get(1))); - assertTrue(endPoints.contains(hosts.get(2))); - - // token 85 goes to nodes 0, 1 and 2 - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(8), testStrategy.getNaturalEndpoints(keyTokens.get(8))); - assertTrue(endPoints.size() == 3); - assertTrue(endPoints.contains(hosts.get(0))); - assertTrue(endPoints.contains(hosts.get(1))); - assertTrue(endPoints.contains(hosts.get(2))); - - // token 95 goes to nodes 0, 1 and 2 - endPoints = testStrategy.getWriteEndpoints(keyTokens.get(9), testStrategy.getNaturalEndpoints(keyTokens.get(9))); - assertTrue(endPoints.size() == 3); - assertTrue(endPoints.contains(hosts.get(0))); - assertTrue(endPoints.contains(hosts.get(1))); - assertTrue(endPoints.contains(hosts.get(2))); + // token 45 goes to nodes 5, boot1 and node7 + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(4), table, testStrategy.getNaturalEndpoints(keyTokens.get(4), table)); + assertTrue(endPoints.size() == 3); + assertTrue(endPoints.contains(hosts.get(5))); + assertTrue(endPoints.contains(boot1)); + assertTrue(endPoints.contains(hosts.get(7))); + + // token 55 goes to boot1, 7, boot2, 8 and 0 + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(5), table, testStrategy.getNaturalEndpoints(keyTokens.get(5), table)); + assertTrue(endPoints.size() == 5); + assertTrue(endPoints.contains(boot1)); + assertTrue(endPoints.contains(hosts.get(7))); + assertTrue(endPoints.contains(boot2)); + assertTrue(endPoints.contains(hosts.get(8))); + assertTrue(endPoints.contains(hosts.get(0))); + + // token 65 goes to nodes 7, boot2, 8, 0 and 1 + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(6), table, testStrategy.getNaturalEndpoints(keyTokens.get(6), table)); + assertTrue(endPoints.size() == 5); + assertTrue(endPoints.contains(hosts.get(7))); + assertTrue(endPoints.contains(boot2)); + assertTrue(endPoints.contains(hosts.get(8))); + assertTrue(endPoints.contains(hosts.get(0))); + assertTrue(endPoints.contains(hosts.get(1))); + + // token 75 goes to nodes boot2, 8, 0, 1 and 2 + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(7), table, testStrategy.getNaturalEndpoints(keyTokens.get(7), table)); + assertTrue(endPoints.size() == 5); + assertTrue(endPoints.contains(boot2)); + assertTrue(endPoints.contains(hosts.get(8))); + assertTrue(endPoints.contains(hosts.get(0))); + assertTrue(endPoints.contains(hosts.get(1))); + assertTrue(endPoints.contains(hosts.get(2))); + + // token 85 goes to nodes 0, 1 and 2 + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(8), table, testStrategy.getNaturalEndpoints(keyTokens.get(8), table)); + assertTrue(endPoints.size() == 3); + assertTrue(endPoints.contains(hosts.get(0))); + assertTrue(endPoints.contains(hosts.get(1))); + assertTrue(endPoints.contains(hosts.get(2))); + + // token 95 goes to nodes 0, 1 and 2 + endPoints = testStrategy.getWriteEndpoints(keyTokens.get(9), table, testStrategy.getNaturalEndpoints(keyTokens.get(9), table)); + assertTrue(endPoints.size() == 3); + assertTrue(endPoints.contains(hosts.get(0))); + assertTrue(endPoints.contains(hosts.get(1))); + assertTrue(endPoints.contains(hosts.get(2))); + } ss.setPartitionerUnsafe(oldPartitioner); ss.setReplicationStrategyUnsafe(oldStrategy); @@ -290,10 +421,10 @@ TokenMetadata tmd = ss.getTokenMetadata(); tmd.clearUnsafe(); IPartitioner partitioner = new RandomPartitioner(); - AbstractReplicationStrategy testStrategy = new RackUnawareStrategy(tmd, 3); + AbstractReplicationStrategy testStrategy = new RackUnawareStrategy(tmd, null); IPartitioner oldPartitioner = ss.setPartitionerUnsafe(partitioner); - AbstractReplicationStrategy oldStrategy = ss.setReplicationStrategyUnsafe(testStrategy); + Map<String, AbstractReplicationStrategy> oldStrategy = ss.setReplicationStrategyUnsafe(createReplacements(testStrategy)); ArrayList<Token> endPointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); @@ -359,10 +490,10 @@ TokenMetadata tmd = ss.getTokenMetadata(); tmd.clearUnsafe(); IPartitioner partitioner = new RandomPartitioner(); - AbstractReplicationStrategy testStrategy = new RackUnawareStrategy(tmd, 3); + AbstractReplicationStrategy testStrategy = new RackUnawareStrategy(tmd, null); IPartitioner oldPartitioner = ss.setPartitionerUnsafe(partitioner); - AbstractReplicationStrategy oldStrategy = ss.setReplicationStrategyUnsafe(testStrategy); + Map<String, AbstractReplicationStrategy> oldStrategy = ss.setReplicationStrategyUnsafe(createReplacements(testStrategy)); ArrayList<Token> endPointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); @@ -403,10 +534,10 @@ TokenMetadata tmd = ss.getTokenMetadata(); tmd.clearUnsafe(); IPartitioner partitioner = new RandomPartitioner(); - AbstractReplicationStrategy testStrategy = new RackUnawareStrategy(tmd, 3); + AbstractReplicationStrategy testStrategy = new RackUnawareStrategy(tmd, null); IPartitioner oldPartitioner = ss.setPartitionerUnsafe(partitioner); - AbstractReplicationStrategy oldStrategy = ss.setReplicationStrategyUnsafe(testStrategy); + Map<String, AbstractReplicationStrategy> oldStrategy = ss.setReplicationStrategyUnsafe(createReplacements(testStrategy)); ArrayList<Token> endPointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); @@ -453,10 +584,10 @@ TokenMetadata tmd = ss.getTokenMetadata(); tmd.clearUnsafe(); IPartitioner partitioner = new RandomPartitioner(); - AbstractReplicationStrategy testStrategy = new RackUnawareStrategy(tmd, 3); + AbstractReplicationStrategy testStrategy = new RackUnawareStrategy(tmd, null); IPartitioner oldPartitioner = ss.setPartitionerUnsafe(partitioner); - AbstractReplicationStrategy oldStrategy = ss.setReplicationStrategyUnsafe(testStrategy); + Map<String, AbstractReplicationStrategy> oldStrategy = ss.setReplicationStrategyUnsafe(createReplacements(testStrategy)); ArrayList<Token> endPointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); @@ -513,4 +644,11 @@ assertTrue(ss.getTokenMetadata().isMember(hosts.get(i))); } + private static Collection<InetAddress> makeAddrs(String... hosts) throws UnknownHostException + { + ArrayList<InetAddress> addrs = new ArrayList<InetAddress>(hosts.length); + for (String host : hosts) + addrs.add(InetAddress.getByName(host)); + return addrs; + } }
