Updated Branches: refs/heads/trunk cf5a31ff9 -> b475bc69b
Binary encoding of tokens. Also promotes new gossip states: HOST_ID and TOKENS Patch by brandonwilliams, reviewed by eevans for CASSANDRA-4383 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b475bc69 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b475bc69 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b475bc69 Branch: refs/heads/trunk Commit: b475bc69b7e6d08e6d12527578552e67e8c0f88a Parents: cf5a31f Author: Brandon Williams <[email protected]> Authored: Tue Sep 4 10:41:58 2012 -0500 Committer: Brandon Williams <[email protected]> Committed: Tue Sep 4 10:41:58 2012 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/gms/ApplicationState.java | 2 + .../org/apache/cassandra/gms/FailureDetector.java | 4 + src/java/org/apache/cassandra/gms/Gossiper.java | 28 ++++- .../org/apache/cassandra/gms/TokenSerializer.java | 68 ++++++++++ .../org/apache/cassandra/gms/VersionedValue.java | 44 +++++-- .../apache/cassandra/service/StorageService.java | 105 +++++++-------- test/unit/org/apache/cassandra/Util.java | 5 +- .../org/apache/cassandra/dht/BootStrapperTest.java | 12 +- .../apache/cassandra/gms/SerializationsTest.java | 3 +- .../service/AntiEntropyServiceTestAbstract.java | 2 +- .../cassandra/service/LeaveAndBootstrapTest.java | 46 ++++--- .../org/apache/cassandra/service/MoveTest.java | 22 ++-- .../org/apache/cassandra/service/RemoveTest.java | 4 - 13 files changed, 234 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/src/java/org/apache/cassandra/gms/ApplicationState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java b/src/java/org/apache/cassandra/gms/ApplicationState.java index f23a6fc..777dfc5 100644 --- a/src/java/org/apache/cassandra/gms/ApplicationState.java +++ b/src/java/org/apache/cassandra/gms/ApplicationState.java @@ -31,6 +31,8 @@ public enum ApplicationState X_11_PADDING, // padding specifically for 1.1 SEVERITY, NET_VERSION, + HOST_ID, + TOKENS, // pad to allow adding new states to existing cluster X1, X2, http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/src/java/org/apache/cassandra/gms/FailureDetector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java index c58a559..2b3905a 100644 --- a/src/java/org/apache/cassandra/gms/FailureDetector.java +++ b/src/java/org/apache/cassandra/gms/FailureDetector.java @@ -101,7 +101,11 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean private void appendEndpointState(StringBuilder sb, EndpointState endpointState) { for (Map.Entry<ApplicationState, VersionedValue> state : endpointState.applicationState.entrySet()) + { + if (state.getKey() == ApplicationState.TOKENS) + continue; sb.append(" ").append(state.getKey()).append(":").append(state.getValue().value).append("\n"); + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index a771197..2404e40 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -618,6 +618,22 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return endpointStateMap.entrySet(); } + public boolean usesHostId(InetAddress endpoint) + { + if (MessagingService.instance().knowsVersion(endpoint) && MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12) + return true; + else if (getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION) != null && Integer.valueOf(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION).value) >= MessagingService.VERSION_12) + return true; + return false; + } + + public UUID getHostId(InetAddress endpoint) + { + if (!usesHostId(endpoint)) + throw new RuntimeException("Host " + endpoint + " does not use new-style tokens!"); + return UUID.fromString(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.HOST_ID).value); + } + EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version) { EndpointState epState = endpointStateMap.get(forEndpoint); @@ -1075,7 +1091,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean /** * This should *only* be used for testing purposes. */ - public void initializeNodeUnsafe(InetAddress addr, int generationNbr) { + public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int generationNbr) { /* initialize the heartbeat state for this localEndpoint */ EndpointState localState = endpointStateMap.get(addr); if ( localState == null ) @@ -1087,6 +1103,16 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } // always add the version state localState.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion()); + localState.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid)); + } + + /** + * This should *only* be used for testing purposes + */ + public void injectApplicationState(InetAddress endpoint, ApplicationState state, VersionedValue value) + { + EndpointState localState = endpointStateMap.get(endpoint); + localState.addApplicationState(state, value); } public long getEndpointDowntime(String address) throws UnknownHostException http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/src/java/org/apache/cassandra/gms/TokenSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/TokenSerializer.java b/src/java/org/apache/cassandra/gms/TokenSerializer.java new file mode 100644 index 0000000..b55967c --- /dev/null +++ b/src/java/org/apache/cassandra/gms/TokenSerializer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.gms; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.ISerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; + + +public class TokenSerializer +{ + private static final Logger logger = LoggerFactory.getLogger(TokenSerializer.class); + + public static void serialize(IPartitioner partitioner, Collection<Token> tokens, DataOutput dos) throws IOException + { + for (Token<?> token : tokens) + { + byte[] bintoken = partitioner.getTokenFactory().toByteArray(token).array(); + dos.writeInt(bintoken.length); + dos.write(bintoken); + } + dos.writeInt(0); + } + + public static Collection<Token> deserialize(IPartitioner partitioner, DataInput dis) throws IOException + { + Collection<Token> tokens = new ArrayList<Token>(); + while (true) + { + int size = dis.readInt(); + if (size < 1) + break; + logger.trace("Reading token of {} bytes", size); + byte[] bintoken = new byte[size]; + dis.readFully(bintoken); + tokens.add(partitioner.getTokenFactory().fromByteArray(ByteBuffer.wrap(bintoken))); + } + return tokens; + } + + public static long serializedSize(Collection<Token> tokens, TypeSizes typeSizes) + { + throw new UnsupportedOperationException(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index f3d5541..92c79eb 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -18,12 +18,16 @@ package org.apache.cassandra.gms; import java.io.*; + import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.UUID; +import com.google.common.collect.Iterables; +import static com.google.common.base.Charsets.ISO_8859_1; + import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; @@ -47,6 +51,7 @@ import org.apache.commons.lang.StringUtils; public class VersionedValue implements Comparable<VersionedValue> { + public static final IVersionedSerializer<VersionedValue> serializer = new VersionedValueSerializer(); // this must be a char that cannot be present in any token @@ -67,8 +72,6 @@ public class VersionedValue implements Comparable<VersionedValue> // values for ApplicationState.REMOVAL_COORDINATOR public final static String REMOVAL_COORDINATOR = "REMOVER"; - // network proto version from MS - public final static String NET_VERSION = "NET_VERSION"; public final int version; public final String value; @@ -110,26 +113,21 @@ public class VersionedValue implements Comparable<VersionedValue> this.partitioner = partitioner; } - public VersionedValue bootstrapping(Collection<Token> tokens, UUID hostId) + public VersionedValue bootstrapping(Collection<Token> tokens) { return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING, - hostId.toString(), makeTokenString(tokens))); } - public VersionedValue normal(Collection<Token> tokens, UUID hostId) + public VersionedValue normal(Collection<Token> tokens) { return new VersionedValue(versionString(VersionedValue.STATUS_NORMAL, - hostId.toString(), makeTokenString(tokens))); } private String makeTokenString(Collection<Token> tokens) { - List<String> tokenStrings = new ArrayList<String>(); - for (Token<?> token : tokens) - tokenStrings.add(partitioner.getTokenFactory().toString(token)); - return StringUtils.join(tokenStrings, VersionedValue.DELIMITER); + return partitioner.getTokenFactory().toString(Iterables.get(tokens, 0)); } public VersionedValue load(double load) @@ -145,14 +143,14 @@ public class VersionedValue implements Comparable<VersionedValue> public VersionedValue leaving(Collection<Token> tokens) { return new VersionedValue(versionString(VersionedValue.STATUS_LEAVING, - makeTokenString(tokens))); + makeTokenString(tokens))); } public VersionedValue left(Collection<Token> tokens, long expireTime) { return new VersionedValue(versionString(VersionedValue.STATUS_LEFT, - Long.toString(expireTime), - makeTokenString(tokens))); + Long.toString(expireTime), + makeTokenString(tokens))); } public VersionedValue moving(Token token) @@ -160,6 +158,26 @@ public class VersionedValue implements Comparable<VersionedValue> return new VersionedValue(VersionedValue.STATUS_MOVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); } + public VersionedValue hostId(UUID hostId) + { + return new VersionedValue(hostId.toString()); + } + + public VersionedValue tokens(Collection<Token> tokens) + { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + try + { + TokenSerializer.serialize(partitioner, tokens, dos); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + return new VersionedValue(new String(bos.toByteArray(), ISO_8859_1)); + } + public VersionedValue removingNonlocal(UUID hostId) { return new VersionedValue(versionString(VersionedValue.REMOVING_TOKEN, hostId.toString())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 0c6e7df..4f06f1c 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.service; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.lang.management.ManagementFactory; @@ -29,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanServer; import javax.management.ObjectName; +import static com.google.common.base.Charsets.ISO_8859_1; import com.google.common.collect.*; import org.apache.log4j.Level; import org.apache.commons.lang.StringUtils; @@ -180,8 +183,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe logger.debug("Setting tokens to {}", tokens); SystemTable.updateTokens(tokens); tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress()); - Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, - valueFactory.normal(getLocalTokens(), SystemTable.getLocalHostId())); + // order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa. + Gossiper.instance.addLocalApplicationState(ApplicationState.TOKENS, valueFactory.tokens(getLocalTokens())); + Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.normal(getLocalTokens())); setMode(Mode.NORMAL, false); } @@ -495,6 +499,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe Gossiper.instance.start(SystemTable.incrementAndGetGeneration()); // needed for node-ring gathering. // gossip network proto version Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion()); + Gossiper.instance.addLocalApplicationState(ApplicationState.HOST_ID, valueFactory.hostId(SystemTable.getLocalHostId())); // gossip schema version when gossiper is running Schema.instance.updateVersionAndAnnounce(); // add rpc listening info @@ -803,8 +808,10 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe if (0 == DatabaseDescriptor.getReplaceTokens().size()) { // if not an existing token then bootstrap + // order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa. + Gossiper.instance.addLocalApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens)); Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, - valueFactory.bootstrapping(tokens, SystemTable.getLocalHostId())); + valueFactory.bootstrapping(tokens)); setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true); try { @@ -1105,19 +1112,22 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe } } - /** - * Checks MS for the version, provided MS _really_ knows it (has directly communicated with the node) otherwise falls back to checking the gossipped version (learned about this node indirectly) - * If both fail, the node is too old to use hostid-style status serialization - * @param endpoint - * @return boolean whether or not to use hostid - */ - private boolean usesHostId(InetAddress endpoint) + private byte[] getApplicationStateValue(InetAddress endpoint, ApplicationState appstate) { - if (MessagingService.instance().knowsVersion(endpoint) && MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12) - return true; - else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION) != null && Integer.valueOf(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION).value) >= MessagingService.VERSION_12) - return true; - return false; + String vvalue = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(appstate).value; + return vvalue.getBytes(ISO_8859_1); + } + + private Collection<Token> getTokensFor(InetAddress endpoint) + { + try + { + return TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(endpoint, ApplicationState.TOKENS)))); + } + catch (IOException e) + { + throw new RuntimeException(e); + } } /** @@ -1132,18 +1142,13 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe // Parse versioned values according to end-point version: // versions < 1.2 .....: STATUS,TOKEN - // versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,... - int tokenPos; - if (usesHostId(endpoint)) - { - assert pieces.length >= 3; - tokenPos = 2; - } - else tokenPos = 1; - - Collection<Token> tokens = new ArrayList<Token>(); - for (int i = tokenPos; i < pieces.length; ++i) - tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i])); + // versions >= 1.2 .....: use TOKENS app state + Collection<Token> tokens; + // explicitly check for TOKENS, because a bootstrapping node might be bootstrapping in legacy mode; that is, not using vnodes and no token specified + if (Gossiper.instance.usesHostId(endpoint) && Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.TOKENS) != null) + tokens = getTokensFor(endpoint); + else + tokens = Arrays.asList(getPartitioner().getTokenFactory().fromString(pieces[1])); if (logger.isDebugEnabled()) logger.debug("Node " + endpoint + " state bootstrapping, token " + tokens); @@ -1166,8 +1171,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe tokenMetadata.addBootstrapTokens(tokens, endpoint); calculatePendingRanges(); - if (usesHostId(endpoint)) - tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint); + if (Gossiper.instance.usesHostId(endpoint)) + tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint); } /** @@ -1183,20 +1188,14 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe // Parse versioned values according to end-point version: // versions < 1.2 .....: STATUS,TOKEN - // versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,... - int tokensPos; - if (usesHostId(endpoint)) - { - assert pieces.length >= 3; - tokensPos = 2; - } - else - tokensPos = 1; - logger.debug("Using token position {} for {}", tokensPos, endpoint); + // versions >= 1.2 .....: uses HOST_ID/TOKENS app states + + Collection<Token> tokens; - Collection<Token> tokens = new ArrayList<Token>(); - for (int i = tokensPos; i < pieces.length; ++i) - tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i])); + if (Gossiper.instance.usesHostId(endpoint)) + tokens = getTokensFor(endpoint); + else + tokens = Arrays.asList(getPartitioner().getTokenFactory().fromString(pieces[1])); if (logger.isDebugEnabled()) logger.debug("Node " + endpoint + " state normal, token " + tokens); @@ -1205,8 +1204,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe logger.info("Node " + endpoint + " state jump to normal"); // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300). - if (usesHostId(endpoint)) - tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint); + if (Gossiper.instance.usesHostId(endpoint)) + tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint); Set<Token> tokensToUpdateInMetadata = new HashSet<Token>(); Set<Token> tokensToUpdateInSystemTable = new HashSet<Token>(); @@ -1278,9 +1277,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe private void handleStateLeaving(InetAddress endpoint, String[] pieces) { assert pieces.length >= 2; - Collection<Token> tokens = new ArrayList<Token>(); - for (int i = 1; i < pieces.length; ++i) - tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i])); + Collection<Token> tokens; + if (Gossiper.instance.usesHostId(endpoint)) + tokens = getTokensFor(endpoint); + else + tokens = Arrays.asList(getPartitioner().getTokenFactory().fromString(pieces[1])); if (logger.isDebugEnabled()) logger.debug("Node " + endpoint + " state leaving, tokens " + tokens); @@ -1314,16 +1315,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe private void handleStateLeft(InetAddress endpoint, String[] pieces) { assert pieces.length >= 2; - Collection<Token> tokens = null; + Collection<Token> tokens; Integer version = MessagingService.instance().getVersion(endpoint); - if (version < MessagingService.VERSION_12) + if (!Gossiper.instance.usesHostId(endpoint)) tokens = Arrays.asList(getPartitioner().getTokenFactory().fromString(pieces[1])); else - { - tokens = new ArrayList<Token>(pieces.length - 2); - for (int i = 2; i < pieces.length; ++i) - tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i])); - } + tokens = getTokensFor(endpoint); if (logger.isDebugEnabled()) logger.debug("Node " + endpoint + " state left, tokens " + tokens); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 336755c..7a04f38 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -230,10 +230,11 @@ public class Util for (int i=0; i<endpointTokens.size(); i++) { InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1)); - Gossiper.instance.initializeNodeUnsafe(ep, 1); + Gossiper.instance.initializeNodeUnsafe(ep, hostIds.get(i), 1); + Gossiper.instance.injectApplicationState(ep, ApplicationState.TOKENS, new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(endpointTokens.get(i)))); ss.onChange(ep, ApplicationState.STATUS, - new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(endpointTokens.get(i)), hostIds.get(i))); + new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(endpointTokens.get(i)))); hosts.add(ep); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/test/unit/org/apache/cassandra/dht/BootStrapperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java index 1e6d1cb..f7d1c7c 100644 --- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java +++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java @@ -85,10 +85,10 @@ public class BootStrapperTest extends SchemaLoader Map<InetAddress, Double> load = new HashMap<InetAddress, Double>(); for (int i = 0; i < addrs.length; i++) { - Gossiper.instance.initializeNodeUnsafe(addrs[i], 1); + Gossiper.instance.initializeNodeUnsafe(addrs[i], UUID.randomUUID(), 1); load.put(addrs[i], (double)i+2); // also make bootstrapping nodes present in gossip - Gossiper.instance.initializeNodeUnsafe(bootstrapAddrs[i], 1); + Gossiper.instance.initializeNodeUnsafe(bootstrapAddrs[i], UUID.randomUUID(), 1); } // give every node a bootstrap source. @@ -102,9 +102,10 @@ public class BootStrapperTest extends SchemaLoader Range<Token> range = ss.getPrimaryRangeForEndpoint(bootstrapSource); Token token = StorageService.getPartitioner().midpoint(range.left, range.right); assert range.contains(token); + Gossiper.instance.injectApplicationState(bootstrapAddrs[i], ApplicationState.TOKENS, ss.valueFactory.tokens(Collections.singleton(token))); ss.onChange(bootstrapAddrs[i], ApplicationState.STATUS, - StorageService.instance.valueFactory.bootstrapping(Collections.<Token>singleton(token), bootstrapHostIds[i])); + StorageService.instance.valueFactory.bootstrapping(Collections.<Token>singleton(token))); } // any further attempt to bootsrtap should fail since every node in the cluster is splitting. @@ -123,7 +124,7 @@ public class BootStrapperTest extends SchemaLoader Token token = StorageService.getPartitioner().midpoint(range.left, range.right); ss.onChange(bootstrapAddrs[2], ApplicationState.STATUS, - StorageService.instance.valueFactory.normal(Collections.singleton(token), bootstrapHostIds[2])); + StorageService.instance.valueFactory.normal(Collections.singleton(token))); load.put(bootstrapAddrs[2], 0d); InetAddress addr = BootStrapper.getBootstrapSource(ss.getTokenMetadata(), load); assert addr != null && addr.equals(addrs[2]); @@ -155,9 +156,10 @@ public class BootStrapperTest extends SchemaLoader Range<Token> range5 = ss.getPrimaryRangeForEndpoint(five); Token fakeToken = StorageService.getPartitioner().midpoint(range5.left, range5.right); assert range5.contains(fakeToken); + Gossiper.instance.injectApplicationState(myEndpoint, ApplicationState.TOKENS, ss.valueFactory.tokens(Collections.singleton(fakeToken))); ss.onChange(myEndpoint, ApplicationState.STATUS, - StorageService.instance.valueFactory.bootstrapping(Collections.<Token>singleton(fakeToken), UUID.randomUUID())); + StorageService.instance.valueFactory.bootstrapping(Collections.<Token>singleton(fakeToken))); tmd = ss.getTokenMetadata(); InetAddress source4 = BootStrapper.getBootstrapSource(tmd, load); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/test/unit/org/apache/cassandra/gms/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java index b14608f..c27218b 100644 --- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java @@ -114,8 +114,7 @@ public class SerializationsTest extends AbstractSerializationsTester private static EndpointState EndpointSt = new EndpointState(HeartbeatSt); private static VersionedValue.VersionedValueFactory vvFact = new VersionedValue.VersionedValueFactory(StorageService.getPartitioner()); private static VersionedValue vv0 = vvFact.load(23d); - private static VersionedValue vv1 = vvFact.bootstrapping(Collections.<Token>singleton(StorageService.getPartitioner().getRandomToken()), - UUID.randomUUID()); + private static VersionedValue vv1 = vvFact.bootstrapping(Collections.<Token>singleton(StorageService.getPartitioner().getRandomToken())); private static List<GossipDigest> Digests = new ArrayList<GossipDigest>(); { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java index acf5173..dd4e1f2 100644 --- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java +++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java @@ -98,7 +98,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), REMOTE); assert tmd.isMember(REMOTE); - Gossiper.instance.initializeNodeUnsafe(REMOTE, 1); + Gossiper.instance.initializeNodeUnsafe(REMOTE, UUID.randomUUID(), 1); local_range = StorageService.instance.getLocalPrimaryRange(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java index 3b76c2c..7e2130c 100644 --- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java +++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java @@ -164,16 +164,17 @@ public class LeaveAndBootstrapTest // boot two new nodes with keyTokens.get(5) and keyTokens.get(7) InetAddress boot1 = InetAddress.getByName("127.0.1.1"); - Gossiper.instance.initializeNodeUnsafe(boot1, 1); - UUID boot1Id = UUID.randomUUID(); + Gossiper.instance.initializeNodeUnsafe(boot1, UUID.randomUUID(), 1); + Gossiper.instance.injectApplicationState(boot1, ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(5)))); ss.onChange(boot1, ApplicationState.STATUS, - valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(5)), boot1Id)); + valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(5)))); InetAddress boot2 = InetAddress.getByName("127.0.1.2"); - Gossiper.instance.initializeNodeUnsafe(boot2, 1); + Gossiper.instance.initializeNodeUnsafe(boot2, UUID.randomUUID(), 1); + Gossiper.instance.injectApplicationState(boot2, ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(7)))); ss.onChange(boot2, ApplicationState.STATUS, - valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7)), UUID.randomUUID())); + valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7)))); Collection<InetAddress> endpoints = null; @@ -329,7 +330,7 @@ public class LeaveAndBootstrapTest valueFactory.left(Collections.singleton(endpointTokens.get(LEAVING[0])), Gossiper.computeExpireTime())); ss.onChange(hosts.get(LEAVING[2]), ApplicationState.STATUS, valueFactory.left(Collections.singleton(endpointTokens.get(LEAVING[2])), Gossiper.computeExpireTime())); - ss.onChange(boot1, ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(5)), boot1Id)); + ss.onChange(boot1, ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(5)))); // adjust precalcuated results. this changes what the epected endpoints are. expectedEndpoints.get("Keyspace1").get(new BigIntegerToken("55")).removeAll(makeAddrs("127.0.0.7", "127.0.0.8")); @@ -462,18 +463,20 @@ public class LeaveAndBootstrapTest assertTrue(tmd.getBootstrapTokens().isEmpty()); // Bootstrap the node immedidiately to keyTokens.get(4) without going through STATE_LEFT + Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(4)))); ss.onChange(hosts.get(2), ApplicationState.STATUS, - valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(4)), hostIds.get(2))); + valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(4)))); assertFalse(tmd.isMember(hosts.get(2))); assertFalse(tmd.isLeaving(hosts.get(2))); assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(4)).equals(hosts.get(2))); // Bootstrap node hosts.get(3) to keyTokens.get(1) + Gossiper.instance.injectApplicationState(hosts.get(3), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(1)))); ss.onChange(hosts.get(3), ApplicationState.STATUS, - valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1)), hostIds.get(3))); + valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1)))); assertFalse(tmd.isMember(hosts.get(3))); assertFalse(tmd.isLeaving(hosts.get(3))); @@ -481,9 +484,10 @@ public class LeaveAndBootstrapTest assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3))); // Bootstrap node hosts.get(2) further to keyTokens.get(3) + Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(3)))); ss.onChange(hosts.get(2), ApplicationState.STATUS, - valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(3)), hostIds.get(2))); + valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(3)))); assertFalse(tmd.isMember(hosts.get(2))); assertFalse(tmd.isLeaving(hosts.get(2))); @@ -492,10 +496,10 @@ public class LeaveAndBootstrapTest assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3))); // Go to normal again for both nodes - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(3)), - hostIds.get(2))); - ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(2)), - hostIds.get(3))); + Gossiper.instance.injectApplicationState(hosts.get(3), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(2)))); + Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(3)))); + ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(3)))); + ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(2)))); assertTrue(tmd.isMember(hosts.get(2))); assertFalse(tmd.isLeaving(hosts.get(2))); @@ -531,8 +535,8 @@ public class LeaveAndBootstrapTest assertTrue(tmd.getToken(hosts.get(2)).equals(endpointTokens.get(2))); // back to normal - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(2)), - hostIds.get(2))); + Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(2)))); + ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(2)))); assertTrue(tmd.getLeavingEndpoints().isEmpty()); assertTrue(tmd.getToken(hosts.get(2)).equals(keyTokens.get(2))); @@ -541,8 +545,8 @@ public class LeaveAndBootstrapTest ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(keyTokens.get(2)))); ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.left(Collections.singleton(keyTokens.get(2)), Gossiper.computeExpireTime())); - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(4)), - hostIds.get(2))); + Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(4)))); + ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(4)))); assertTrue(tmd.getBootstrapTokens().isEmpty()); assertTrue(tmd.getLeavingEndpoints().isEmpty()); @@ -567,6 +571,7 @@ public class LeaveAndBootstrapTest Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 6); // node 2 leaves with _different_ token + Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(0)))); ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(keyTokens.get(0)))); assertTrue(tmd.getToken(hosts.get(2)).equals(keyTokens.get(0))); @@ -574,9 +579,10 @@ public class LeaveAndBootstrapTest assertTrue(tmd.getEndpoint(endpointTokens.get(2)) == null); // go to boostrap + Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(1)))); ss.onChange(hosts.get(2), ApplicationState.STATUS, - valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1)), hostIds.get(2))); + valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1)))); assertFalse(tmd.isLeaving(hosts.get(2))); assertTrue(tmd.getBootstrapTokens().size() == 1); @@ -621,13 +627,15 @@ public class LeaveAndBootstrapTest assertFalse(tmd.isMember(hosts.get(2))); // node hosts.get(4) goes to bootstrap - ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1)), hostIds.get(3))); + Gossiper.instance.injectApplicationState(hosts.get(3), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(1)))); + ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1)))); assertFalse(tmd.isMember(hosts.get(3))); assertTrue(tmd.getBootstrapTokens().size() == 1); assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3))); // and then directly to 'left' + Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(1)))); ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.left(Collections.singleton(keyTokens.get(1)), Gossiper.computeExpireTime())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/test/unit/org/apache/cassandra/service/MoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java index cf42564..ce7864c 100644 --- a/test/unit/org/apache/cassandra/service/MoveTest.java +++ b/test/unit/org/apache/cassandra/service/MoveTest.java @@ -136,7 +136,7 @@ public class MoveTest } // moving endpoint back to the normal state - ss.onChange(hosts.get(MOVING_NODE), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken), hostIds.get(MOVING_NODE))); + ss.onChange(hosts.get(MOVING_NODE), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken))); } /* @@ -181,15 +181,17 @@ public class MoveTest // boot two new nodes with keyTokens.get(5) and keyTokens.get(7) InetAddress boot1 = InetAddress.getByName("127.0.1.1"); - Gossiper.instance.initializeNodeUnsafe(boot1, 1); + Gossiper.instance.initializeNodeUnsafe(boot1, UUID.randomUUID(), 1); + Gossiper.instance.injectApplicationState(boot1, ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(5)))); ss.onChange(boot1, ApplicationState.STATUS, - valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(5)), UUID.randomUUID())); + valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(5)))); InetAddress boot2 = InetAddress.getByName("127.0.1.2"); - Gossiper.instance.initializeNodeUnsafe(boot2, 1); + Gossiper.instance.initializeNodeUnsafe(boot2, UUID.randomUUID(), 1); + Gossiper.instance.injectApplicationState(boot2, ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(keyTokens.get(7)))); ss.onChange(boot2, ApplicationState.STATUS, - valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7)), UUID.randomUUID())); + valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7)))); // don't require test update every time a new keyspace is added to test/conf/cassandra.yaml Map<String, AbstractReplicationStrategy> tableStrategyMap = new HashMap<String, AbstractReplicationStrategy>(); @@ -477,7 +479,7 @@ public class MoveTest { ss.onChange(hosts.get(movingIndex), ApplicationState.STATUS, - valueFactory.normal(Collections.singleton(newTokens.get(movingIndex)), hostIds.get(movingIndex))); + valueFactory.normal(Collections.singleton(newTokens.get(movingIndex)))); } } @@ -506,8 +508,8 @@ public class MoveTest assertTrue(tmd.getToken(hosts.get(2)).equals(endpointTokens.get(2))); // back to normal - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken), - hostIds.get(2))); + Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(newToken))); + ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken))); assertTrue(tmd.getMovingEndpoints().isEmpty()); assertTrue(tmd.getToken(hosts.get(2)).equals(newToken)); @@ -515,8 +517,8 @@ public class MoveTest newToken = positionToken(8); // node 2 goes through leave and left and then jumps to normal at its new token ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.moving(newToken)); - ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken), - hostIds.get(2))); + Gossiper.instance.injectApplicationState(hosts.get(2), ApplicationState.TOKENS, valueFactory.tokens(Collections.singleton(newToken))); + ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken))); assertTrue(tmd.getBootstrapTokens().isEmpty()); assertTrue(tmd.getMovingEndpoints().isEmpty()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b475bc69/test/unit/org/apache/cassandra/service/RemoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java index cfbc013..58ae797 100644 --- a/test/unit/org/apache/cassandra/service/RemoveTest.java +++ b/test/unit/org/apache/cassandra/service/RemoveTest.java @@ -88,10 +88,6 @@ public class RemoveTest MessagingService.instance().listen(FBUtilities.getBroadcastAddress()); Gossiper.instance.start(1); - for (int i = 0; i < 6; i++) - { - Gossiper.instance.initializeNodeUnsafe(hosts.get(i), 1); - } removalhost = hosts.get(5); hosts.remove(removalhost); removalId = hostIds.get(5);
