gossip host ID; Maintain a mapping of endpoint to ID Patch by eevans; reviewed by Brandon Williams for CASSANDRA-4120
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ad685c46 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ad685c46 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ad685c46 Branch: refs/heads/trunk Commit: ad685c4615b08725488fdf26c1dd248cfe196cf8 Parents: 712ffeb Author: Eric Evans <[email protected]> Authored: Wed May 2 18:47:13 2012 -0500 Committer: Eric Evans <[email protected]> Committed: Wed May 2 18:47:13 2012 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/gms/VersionedValue.java | 33 +++++++-- .../apache/cassandra/locator/TokenMetadata.java | 52 +++++++++++++ .../org/apache/cassandra/net/MessagingService.java | 3 +- .../apache/cassandra/service/StorageService.java | 56 +++++++++++++- .../cassandra/service/StorageServiceMBean.java | 7 ++ src/java/org/apache/cassandra/tools/NodeCmd.java | 18 +++++ src/java/org/apache/cassandra/tools/NodeProbe.java | 10 +++ test/unit/org/apache/cassandra/Util.java | 12 +++- .../org/apache/cassandra/dht/BootStrapperTest.java | 20 +++++- .../apache/cassandra/gms/SerializationsTest.java | 3 +- .../cassandra/service/LeaveAndBootstrapTest.java | 43 +++++++----- .../org/apache/cassandra/service/MoveTest.java | 21 +++--- .../org/apache/cassandra/service/RemoveTest.java | 4 +- 13 files changed, 238 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index 36ff1d9..25225c5 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -24,7 +24,9 @@ import java.util.UUID; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.FBUtilities; +import org.apache.commons.lang.StringUtils; /** @@ -88,6 +90,11 @@ public class VersionedValue implements Comparable<VersionedValue> return "Value(" + value + "," + version + ")"; } + private static String versionString(String...args) + { + return StringUtils.join(args, VersionedValue.DELIMITER); + } + public static class VersionedValueFactory { final IPartitioner partitioner; @@ -97,14 +104,18 @@ public class VersionedValue implements Comparable<VersionedValue> this.partitioner = partitioner; } - public VersionedValue bootstrapping(Token token) + public VersionedValue bootstrapping(Token token, UUID hostId) { - return new VersionedValue(VersionedValue.STATUS_BOOTSTRAPPING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); + return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING, + hostId.toString(), + partitioner.getTokenFactory().toString(token))); } - public VersionedValue normal(Token token) + public VersionedValue normal(Token token, UUID hostId) { - return new VersionedValue(VersionedValue.STATUS_NORMAL + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); + return new VersionedValue(versionString(VersionedValue.STATUS_NORMAL, + hostId.toString(), + partitioner.getTokenFactory().toString(token))); } public VersionedValue load(double load) @@ -189,7 +200,19 @@ public class VersionedValue implements Comparable<VersionedValue> { public void serialize(VersionedValue value, DataOutput dos, int version) throws IOException { - dos.writeUTF(value.value); + String outValue = value.value; + + if (version < MessagingService.VERSION_12) + { + String[] pieces = value.value.split(DELIMITER_STR, -1); + if ((pieces[0] == STATUS_NORMAL) || pieces[0] == STATUS_BOOTSTRAPPING) + { + assert pieces.length >= 3; + outValue = versionString(pieces[0], pieces[2]); + } + } + + dos.writeUTF(outValue); dos.writeInt(value.version); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index 7e72eb4..1cb2a61 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.service.StorageService; public class TokenMetadata @@ -42,6 +43,9 @@ public class TokenMetadata /* Maintains token to endpoint map of every node in the cluster. */ private final BiMap<Token, InetAddress> tokenToEndpointMap; + /* Maintains endpoint to host ID map of every node in the cluster */ + private final BiMap<InetAddress, UUID> endpointToHostIdMap; + // Prior to CASSANDRA-603, we just had <tt>Map<Range, InetAddress> pendingRanges<tt>, // which was added to when a node began bootstrap and removed from when it finished. // @@ -93,6 +97,7 @@ public class TokenMetadata if (tokenToEndpointMap == null) tokenToEndpointMap = HashBiMap.create(); this.tokenToEndpointMap = tokenToEndpointMap; + endpointToHostIdMap = HashBiMap.create(); sortedTokens = sortTokens(); } @@ -172,6 +177,51 @@ public class TokenMetadata } } + /** + * Store an end-point to host ID mapping. Each ID must be unique, and + * cannot be changed after the fact. + * + * @param hostId + * @param endpoint + */ + public void updateHostId(UUID hostId, InetAddress endpoint) + { + assert hostId != null; + assert endpoint != null; + + InetAddress storedEp = endpointToHostIdMap.inverse().get(hostId); + if (storedEp != null) + { + if (!storedEp.equals(endpoint) && (FailureDetector.instance.isAlive(storedEp))) + { + throw new RuntimeException(String.format("Host ID collision between active endpoint %s and %s (id=%s)", + storedEp, + endpoint, + hostId)); + } + } + + UUID storedId = endpointToHostIdMap.get(endpoint); + if ((storedId != null) && (!storedId.equals(hostId))) + logger.warn("Changing {}'s host ID from {} to {}", new Object[] {endpoint, storedId, hostId}); + + endpointToHostIdMap.forcePut(endpoint, hostId); + } + + /** Return the unique host ID for an end-point. */ + public UUID getHostId(InetAddress endpoint) + { + return endpointToHostIdMap.get(endpoint); + } + + /** @return a copy of the endpoint-to-id map for read-only operations */ + public Map<InetAddress, UUID> getEndpointToHostIdMapForReading() + { + Map<InetAddress, UUID> readMap = new HashMap<InetAddress, UUID>(); + readMap.putAll(endpointToHostIdMap); + return readMap; + } + public void addBootstrapToken(Token token, InetAddress endpoint) { assert token != null; @@ -260,6 +310,7 @@ public class TokenMetadata bootstrapTokens.inverse().remove(endpoint); tokenToEndpointMap.inverse().remove(endpoint); leavingEndpoints.remove(endpoint); + endpointToHostIdMap.remove(endpoint); sortedTokens = sortTokens(); invalidateCaches(); } @@ -607,6 +658,7 @@ public class TokenMetadata tokenToEndpointMap.clear(); leavingEndpoints.clear(); pendingRanges.clear(); + endpointToHostIdMap.clear(); invalidateCaches(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 96fac99..c9a928e 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -70,7 +70,8 @@ public final class MessagingService implements MessagingServiceMBean public static final int VERSION_080 = 2; public static final int VERSION_10 = 3; public static final int VERSION_11 = 4; - public static final int current_version = VERSION_11; + public static final int VERSION_12 = 5; + public static final int current_version = VERSION_12; static SerializerType serializerType = SerializerType.BINARY; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 6b195c5..64ec4b6 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -252,7 +252,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe logger.debug("Setting token to {}", token); SystemTable.updateToken(token); tokenMetadata.updateNormalToken(token, FBUtilities.getBroadcastAddress()); - Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.normal(getLocalToken())); + Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, + valueFactory.normal(getLocalToken(), SystemTable.getLocalHostId())); setMode(Mode.NORMAL, false); } @@ -533,6 +534,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe logger.info("Starting up server gossip"); joined = true; + // Seed the host ID-to-endpoint map with our own ID. + getTokenMetadata().updateHostId(SystemTable.getLocalHostId(), FBUtilities.getBroadcastAddress()); // have to start the gossip service before we can see any info on other nodes. this is necessary // for bootstrap to get the load info it needs. @@ -759,7 +762,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe if (null == DatabaseDescriptor.getReplaceToken()) { // if not an existing token then bootstrap - Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.bootstrapping(token)); + Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, + valueFactory.bootstrapping(token, SystemTable.getLocalHostId())); setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true); try { @@ -972,6 +976,19 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe return mapString; } + public String getLocalHostId() + { + return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddress()).toString(); + } + + public Map<String, String> getHostIdMap() + { + Map<String, String> mapOut = new HashMap<String, String>(); + for (Map.Entry<InetAddress, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet()) + mapOut.put(entry.getKey().getHostAddress(), entry.getValue().toString()); + return mapOut; + } + /** * Construct the range to endpoint mapping based on the true view * of the world. @@ -1074,7 +1091,19 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe private void handleStateBootstrap(InetAddress endpoint, String[] pieces) { assert pieces.length >= 2; - Token token = getPartitioner().getTokenFactory().fromString(pieces[1]); + + // Parse versioned values according to end-point version: + // versions < 1.2 .....: STATUS,TOKEN + // versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,... + int tokenPos; + if (Gossiper.instance.getVersion(endpoint) >= MessagingService.VERSION_12) + { + assert pieces.length >= 3; + tokenPos = 2; + } + else tokenPos = 1; + + Token token = getPartitioner().getTokenFactory().fromString(pieces[tokenPos]); if (logger.isDebugEnabled()) logger.debug("Node " + endpoint + " state bootstrapping, token " + token); @@ -1096,6 +1125,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe tokenMetadata.addBootstrapToken(token, endpoint); calculatePendingRanges(); + + if (Gossiper.instance.getVersion(endpoint) >= MessagingService.VERSION_12) + tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint); } /** @@ -1108,7 +1140,20 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe private void handleStateNormal(InetAddress endpoint, String[] pieces) { assert pieces.length >= 2; - Token token = getPartitioner().getTokenFactory().fromString(pieces[1]); + + // Parse versioned values according to end-point version: + // versions < 1.2 .....: STATUS,TOKEN + // versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,... + int tokensPos; + if (Gossiper.instance.getVersion(endpoint) >= MessagingService.VERSION_12) + { + assert pieces.length >= 3; + tokensPos = 2; + } + else + tokensPos = 1; + + Token token = getPartitioner().getTokenFactory().fromString(pieces[tokensPos]); if (logger.isDebugEnabled()) logger.debug("Node " + endpoint + " state normal, token " + token); @@ -1150,6 +1195,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe tokenMetadata.removeFromMoving(endpoint); calculatePendingRanges(); + + if (Gossiper.instance.getVersion(endpoint) >= MessagingService.VERSION_12) + tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 12f714c..2af4215 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -19,6 +19,7 @@ package org.apache.cassandra.service; import java.io.IOException; import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -154,6 +155,12 @@ public interface StorageServiceMBean */ public Map<String, String> getTokenToEndpointMap(); + /** Retrieve this hosts unique ID */ + public String getLocalHostId(); + + /** Retrieve the mapping of endpoint to host ID */ + public Map<String, String> getHostIdMap(); + /** * Numeric load value. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index c7ecd7d..4f2fd8b 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -94,6 +94,7 @@ public class NodeCmd GETCOMPACTIONTHRESHOLD, GETENDPOINTS, GOSSIPINFO, + IDS, INFO, INVALIDATEKEYCACHE, INVALIDATEROWCACHE, @@ -137,6 +138,7 @@ public class NodeCmd addCmdHelp(header, "join", "Join the ring"); addCmdHelp(header, "info", "Print node informations (uptime, load, ...)"); addCmdHelp(header, "cfstats", "Print statistics on column families"); + addCmdHelp(header, "ids", "Print list of unique host IDs"); addCmdHelp(header, "version", "Print cassandra version"); addCmdHelp(header, "tpstats", "Print usage statistics of thread pools"); addCmdHelp(header, "proxyhistograms", "Print statistic histograms for network operations"); @@ -282,6 +284,20 @@ public class NodeCmd } } + /** Writes a table of host IDs to a PrintStream */ + public void printHostIds(PrintStream outs) + { + System.out.print(String.format("%-16s %-7s %s%n", "Address", "Status", "Host ID")); + for (Map.Entry<String, String> entry : probe.getHostIdMap().entrySet()) + { + String status; + if (probe.getLiveNodes().contains(entry.getKey())) status = "Up"; + else if (probe.getUnreachableNodes().contains(entry.getKey())) status = "Down"; + else status = "?"; + System.out.print(String.format("%-16s %-7s %s%n", entry.getKey(), status, entry.getValue())); + } + } + public void printThreadPoolStats(PrintStream outs) { outs.printf("%-25s%10s%10s%15s%10s%18s%n", "Pool Name", "Active", "Pending", "Completed", "Blocked", "All time blocked"); @@ -315,6 +331,7 @@ public class NodeCmd { boolean gossipInitialized = probe.isInitialized(); outs.printf("%-17s: %s%n", "Token", probe.getToken()); + outs.printf("%-17s: %s%n", "ID", probe.getLocalHostId()); outs.printf("%-17s: %s%n", "Gossip active", gossipInitialized); outs.printf("%-17s: %s%n", "Load", probe.getLoadString()); if (gossipInitialized) @@ -720,6 +737,7 @@ public class NodeCmd case ENABLETHRIFT : probe.startThriftServer(); break; case STATUSTHRIFT : nodeCmd.printIsThriftServerRunning(System.out); break; case RESETLOCALSCHEMA: probe.resetLocalSchema(); break; + case IDS : nodeCmd.printHostIds(System.out); break; case DRAIN : try { probe.drain(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 5dad98d..e342cf2 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -313,6 +313,16 @@ public class NodeProbe return ssProxy.getToken(); } + public String getLocalHostId() + { + return ssProxy.getLocalHostId(); + } + + public Map<String, String> getHostIdMap() + { + return ssProxy.getHostIdMap(); + } + public String getLoadString() { return ssProxy.getLoadString(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index fd5259a..a55787c 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -50,6 +51,8 @@ import static org.junit.Assert.assertTrue; public class Util { + private static List<UUID> hostIdPool = new ArrayList<UUID>(); + public static DecoratedKey dk(String key) { return StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(key)); @@ -208,19 +211,24 @@ public class Util * Creates initial set of nodes and tokens. Nodes are added to StorageService as 'normal' */ public static void createInitialRing(StorageService ss, IPartitioner partitioner, List<Token> endpointTokens, - List<Token> keyTokens, List<InetAddress> hosts, int howMany) + List<Token> keyTokens, List<InetAddress> hosts, List<UUID> hostIds, int howMany) throws UnknownHostException { + // Expand pool of host IDs as necessary + for (int i = hostIdPool.size(); i < howMany; i++) + hostIdPool.add(UUID.randomUUID()); + for (int i=0; i<howMany; i++) { endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i))); keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5))); + hostIds.add(hostIdPool.get(i)); } for (int i=0; i<endpointTokens.size(); i++) { InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1)); - ss.onChange(ep, ApplicationState.STATUS, new VersionedValue.VersionedValueFactory(partitioner).normal(endpointTokens.get(i))); + ss.onChange(ep, ApplicationState.STATUS, new VersionedValue.VersionedValueFactory(partitioner).normal(endpointTokens.get(i), hostIds.get(i))); hosts.add(ep); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/test/unit/org/apache/cassandra/dht/BootStrapperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java index 56eba35..6e9236f 100644 --- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java +++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.HashMap; import java.util.Set; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import org.junit.Test; @@ -75,6 +76,13 @@ public class BootStrapperTest extends SchemaLoader InetAddress.getByName("127.0.0.14"), InetAddress.getByName("127.0.0.15"), }; + UUID[] bootstrapHostIds = new UUID[] + { + UUID.randomUUID(), + UUID.randomUUID(), + UUID.randomUUID(), + UUID.randomUUID(), + }; Map<InetAddress, Double> load = new HashMap<InetAddress, Double>(); for (int i = 0; i < addrs.length; i++) { @@ -93,7 +101,9 @@ public class BootStrapperTest extends SchemaLoader Range<Token> range = ss.getPrimaryRangeForEndpoint(bootstrapSource); Token token = StorageService.getPartitioner().midpoint(range.left, range.right); assert range.contains(token); - ss.onChange(bootstrapAddrs[i], ApplicationState.STATUS, StorageService.instance.valueFactory.bootstrapping(token)); + ss.onChange(bootstrapAddrs[i], + ApplicationState.STATUS, + StorageService.instance.valueFactory.bootstrapping(token, bootstrapHostIds[i])); } // any further attempt to bootsrtap should fail since every node in the cluster is splitting. @@ -110,7 +120,9 @@ public class BootStrapperTest extends SchemaLoader // indicate that one of the nodes is done. see if the node it was bootstrapping from is still available. Range<Token> range = ss.getPrimaryRangeForEndpoint(addrs[2]); Token token = StorageService.getPartitioner().midpoint(range.left, range.right); - ss.onChange(bootstrapAddrs[2], ApplicationState.STATUS, StorageService.instance.valueFactory.normal(token)); + ss.onChange(bootstrapAddrs[2], + ApplicationState.STATUS, + StorageService.instance.valueFactory.normal(token, bootstrapHostIds[2])); load.put(bootstrapAddrs[2], 0d); InetAddress addr = BootStrapper.getBootstrapSource(ss.getTokenMetadata(), load); assert addr != null && addr.equals(addrs[2]); @@ -142,7 +154,9 @@ public class BootStrapperTest extends SchemaLoader Range<Token> range5 = ss.getPrimaryRangeForEndpoint(five); Token fakeToken = StorageService.getPartitioner().midpoint(range5.left, range5.right); assert range5.contains(fakeToken); - ss.onChange(myEndpoint, ApplicationState.STATUS, StorageService.instance.valueFactory.bootstrapping(fakeToken)); + ss.onChange(myEndpoint, + ApplicationState.STATUS, + StorageService.instance.valueFactory.bootstrapping(fakeToken, UUID.randomUUID())); tmd = ss.getTokenMetadata(); InetAddress source4 = BootStrapper.getBootstrapSource(tmd, load); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/test/unit/org/apache/cassandra/gms/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java index 0010465..4598f71 100644 --- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; public class SerializationsTest extends AbstractSerializationsTester { @@ -101,7 +102,7 @@ public class SerializationsTest extends AbstractSerializationsTester private static EndpointState EndpointSt = new EndpointState(HeartbeatSt); private static VersionedValue.VersionedValueFactory vvFact = new VersionedValue.VersionedValueFactory(StorageService.getPartitioner()); private static VersionedValue vv0 = vvFact.load(23d); - private static VersionedValue vv1 = vvFact.bootstrapping(StorageService.getPartitioner().getRandomToken()); + private static VersionedValue vv1 = vvFact.bootstrapping(StorageService.getPartitioner().getRandomToken(), UUID.randomUUID()); private static List<GossipDigest> Digests = new ArrayList<GossipDigest>(); { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java index faa9e18..be53b53 100644 --- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java +++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java @@ -82,8 +82,9 @@ public class LeaveAndBootstrapTest ArrayList<Token> endpointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); List<InetAddress> hosts = new ArrayList<InetAddress>(); + List<UUID> hostIds = new ArrayList<UUID>(); - Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, RING_SIZE); + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE); Map<Token, List<InetAddress>> expectedEndpoints = new HashMap<Token, List<InetAddress>>(); for (String table : Schema.instance.getNonSystemTables()) @@ -149,9 +150,10 @@ public class LeaveAndBootstrapTest ArrayList<Token> endpointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); List<InetAddress> hosts = new ArrayList<InetAddress>(); + List<UUID> hostIds = new ArrayList<UUID>(); // create a ring or 10 nodes - Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, RING_SIZE); + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE); // nodes 6, 8 and 9 leave final int[] LEAVING = new int[] {6, 8, 9}; @@ -160,9 +162,10 @@ public class LeaveAndBootstrapTest // boot two new nodes with keyTokens.get(5) and keyTokens.get(7) InetAddress boot1 = InetAddress.getByName("127.0.1.1"); - ss.onChange(boot1, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(5))); + UUID boot1Id = UUID.randomUUID(); + ss.onChange(boot1, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(5), boot1Id)); InetAddress boot2 = InetAddress.getByName("127.0.1.2"); - ss.onChange(boot2, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(7))); + ss.onChange(boot2, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(7), UUID.randomUUID())); Collection<InetAddress> endpoints = null; @@ -318,7 +321,7 @@ public class LeaveAndBootstrapTest valueFactory.left(endpointTokens.get(LEAVING[0]), Gossiper.computeExpireTime())); ss.onChange(hosts.get(LEAVING[2]), ApplicationState.STATUS, valueFactory.left(endpointTokens.get(LEAVING[2]), Gossiper.computeExpireTime())); - ss.onChange(boot1, ApplicationState.STATUS, valueFactory.normal(keyTokens.get(5))); + ss.onChange(boot1, ApplicationState.STATUS, valueFactory.normal(keyTokens.get(5), boot1Id)); // adjust precalcuated results. this changes what the epected endpoints are. expectedEndpoints.get("Keyspace1").get(new BigIntegerToken("55")).removeAll(makeAddrs("127.0.0.7", "127.0.0.8")); @@ -434,9 +437,10 @@ public class LeaveAndBootstrapTest ArrayList<Token> endpointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); List<InetAddress> hosts = new ArrayList<InetAddress>(); + List<UUID> hostIds = new ArrayList<UUID>(); // create a ring or 5 nodes - Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 7); + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 7); // node 2 leaves ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(endpointTokens.get(2))); @@ -448,14 +452,14 @@ public class LeaveAndBootstrapTest assertTrue(tmd.getBootstrapTokens().isEmpty()); // Bootstrap the node immedidiately to keyTokens.get(4) without going through STATE_LEFT - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(4))); + ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(4), hostIds.get(2))); assertFalse(tmd.isMember(hosts.get(2))); assertFalse(tmd.isLeaving(hosts.get(2))); assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(4)).equals(hosts.get(2))); // Bootstrap node hosts.get(3) to keyTokens.get(1) - ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(1))); + ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(1), hostIds.get(3))); assertFalse(tmd.isMember(hosts.get(3))); assertFalse(tmd.isLeaving(hosts.get(3))); @@ -463,7 +467,7 @@ public class LeaveAndBootstrapTest assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3))); // Bootstrap node hosts.get(2) further to keyTokens.get(3) - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(3))); + ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(3), hostIds.get(2))); assertFalse(tmd.isMember(hosts.get(2))); assertFalse(tmd.isLeaving(hosts.get(2))); @@ -472,8 +476,8 @@ public class LeaveAndBootstrapTest assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3))); // Go to normal again for both nodes - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(3))); - ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(2))); + ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(3), hostIds.get(2))); + ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(2), hostIds.get(3))); assertTrue(tmd.isMember(hosts.get(2))); assertFalse(tmd.isLeaving(hosts.get(2))); @@ -497,9 +501,10 @@ public class LeaveAndBootstrapTest ArrayList<Token> endpointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); List<InetAddress> hosts = new ArrayList<InetAddress>(); + List<UUID> hostIds = new ArrayList<UUID>(); // create a ring or 5 nodes - Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 6); + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 6); // node 2 leaves ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(endpointTokens.get(2))); @@ -508,7 +513,7 @@ public class LeaveAndBootstrapTest assertTrue(tmd.getToken(hosts.get(2)).equals(endpointTokens.get(2))); // back to normal - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(2))); + ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(2), hostIds.get(2))); assertTrue(tmd.getLeavingEndpoints().isEmpty()); assertTrue(tmd.getToken(hosts.get(2)).equals(keyTokens.get(2))); @@ -517,7 +522,7 @@ public class LeaveAndBootstrapTest ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(keyTokens.get(2))); ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.left(keyTokens.get(2), Gossiper.computeExpireTime())); - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(4))); + ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(4), hostIds.get(2))); assertTrue(tmd.getBootstrapTokens().isEmpty()); assertTrue(tmd.getLeavingEndpoints().isEmpty()); @@ -536,9 +541,10 @@ public class LeaveAndBootstrapTest ArrayList<Token> endpointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); List<InetAddress> hosts = new ArrayList<InetAddress>(); + List<UUID> hostIds = new ArrayList<UUID>(); // create a ring or 5 nodes - Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 6); + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 6); // node 2 leaves with _different_ token ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(keyTokens.get(0))); @@ -548,7 +554,7 @@ public class LeaveAndBootstrapTest assertTrue(tmd.getEndpoint(endpointTokens.get(2)) == null); // go to boostrap - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(1))); + ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(1), hostIds.get(2))); assertFalse(tmd.isLeaving(hosts.get(2))); assertTrue(tmd.getBootstrapTokens().size() == 1); @@ -581,9 +587,10 @@ public class LeaveAndBootstrapTest ArrayList<Token> endpointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); List<InetAddress> hosts = new ArrayList<InetAddress>(); + List<UUID> hostIds = new ArrayList<UUID>(); // create a ring of 6 nodes - Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 7); + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 7); // node hosts.get(2) goes jumps to left ss.onChange(hosts.get(2), ApplicationState.STATUS, @@ -592,7 +599,7 @@ public class LeaveAndBootstrapTest assertFalse(tmd.isMember(hosts.get(2))); // node hosts.get(4) goes to bootstrap - ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(1))); + ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(1), hostIds.get(3))); assertFalse(tmd.isMember(hosts.get(3))); assertTrue(tmd.getBootstrapTokens().size() == 1); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/test/unit/org/apache/cassandra/service/MoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java index cbdad15..b44fa32 100644 --- a/test/unit/org/apache/cassandra/service/MoveTest.java +++ b/test/unit/org/apache/cassandra/service/MoveTest.java @@ -86,8 +86,9 @@ public class MoveTest ArrayList<Token> endpointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); List<InetAddress> hosts = new ArrayList<InetAddress>(); + List<UUID> hostIds = new ArrayList<UUID>(); - Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, RING_SIZE); + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE); Map<Token, List<InetAddress>> expectedEndpoints = new HashMap<Token, List<InetAddress>>(); for (String table : Schema.instance.getNonSystemTables()) @@ -133,7 +134,7 @@ public class MoveTest } // moving endpoint back to the normal state - ss.onChange(hosts.get(MOVING_NODE), ApplicationState.STATUS, valueFactory.normal(newToken)); + ss.onChange(hosts.get(MOVING_NODE), ApplicationState.STATUS, valueFactory.normal(newToken, hostIds.get(MOVING_NODE))); } /* @@ -152,9 +153,10 @@ public class MoveTest ArrayList<Token> endpointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); List<InetAddress> hosts = new ArrayList<InetAddress>(); + List<UUID> hostIds = new ArrayList<UUID>(); // create a ring or 10 nodes - Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, RING_SIZE); + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE); // nodes 6, 8 and 9 leave final int[] MOVING = new int[] {6, 8, 9}; @@ -177,9 +179,9 @@ public class MoveTest // boot two new nodes with keyTokens.get(5) and keyTokens.get(7) InetAddress boot1 = InetAddress.getByName("127.0.1.1"); - ss.onChange(boot1, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(5))); + ss.onChange(boot1, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(5), UUID.randomUUID())); InetAddress boot2 = InetAddress.getByName("127.0.1.2"); - ss.onChange(boot2, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(7))); + ss.onChange(boot2, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(7), UUID.randomUUID())); // don't require test update every time a new keyspace is added to test/conf/cassandra.yaml Map<String, AbstractReplicationStrategy> tableStrategyMap = new HashMap<String, AbstractReplicationStrategy>(); @@ -465,7 +467,7 @@ public class MoveTest // all moving nodes are back to the normal state for (Integer movingIndex : MOVING) { - ss.onChange(hosts.get(movingIndex), ApplicationState.STATUS, valueFactory.normal(newTokens.get(movingIndex))); + ss.onChange(hosts.get(movingIndex), ApplicationState.STATUS, valueFactory.normal(newTokens.get(movingIndex), hostIds.get(movingIndex))); } } @@ -481,9 +483,10 @@ public class MoveTest ArrayList<Token> endpointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); List<InetAddress> hosts = new ArrayList<InetAddress>(); + List<UUID> hostIds = new ArrayList<UUID>(); // create a ring or 6 nodes - Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 6); + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 6); // node 2 leaves Token newToken = positionToken(7); @@ -493,7 +496,7 @@ public class MoveTest assertTrue(tmd.getToken(hosts.get(2)).equals(endpointTokens.get(2))); // back to normal - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(newToken)); + ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(newToken, hostIds.get(2))); assertTrue(tmd.getMovingEndpoints().isEmpty()); assertTrue(tmd.getToken(hosts.get(2)).equals(newToken)); @@ -501,7 +504,7 @@ public class MoveTest newToken = positionToken(8); // node 2 goes through leave and left and then jumps to normal at its new token ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.moving(newToken)); - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(newToken)); + ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(newToken, hostIds.get(2))); assertTrue(tmd.getBootstrapTokens().isEmpty()); assertTrue(tmd.getMovingEndpoints().isEmpty()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/test/unit/org/apache/cassandra/service/RemoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java index 910267a..e56e291 100644 --- a/test/unit/org/apache/cassandra/service/RemoveTest.java +++ b/test/unit/org/apache/cassandra/service/RemoveTest.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.After; @@ -57,6 +58,7 @@ public class RemoveTest ArrayList<Token> endpointTokens = new ArrayList<Token>(); ArrayList<Token> keyTokens = new ArrayList<Token>(); List<InetAddress> hosts = new ArrayList<InetAddress>(); + List<UUID> hostIds = new ArrayList<UUID>(); InetAddress removalhost; Token removaltoken; @@ -80,7 +82,7 @@ public class RemoveTest tmd.clearUnsafe(); // create a ring of 5 nodes - Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 6); + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 6); MessagingService.instance().listen(FBUtilities.getBroadcastAddress()); Gossiper.instance.start(1);
