gossip host ID; Maintain a mapping of endpoint to ID

Patch by eevans; reviewed by Brandon Williams for CASSANDRA-4120


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ad685c46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ad685c46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ad685c46

Branch: refs/heads/trunk
Commit: ad685c4615b08725488fdf26c1dd248cfe196cf8
Parents: 712ffeb
Author: Eric Evans <[email protected]>
Authored: Wed May 2 18:47:13 2012 -0500
Committer: Eric Evans <[email protected]>
Committed: Wed May 2 18:47:13 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/gms/VersionedValue.java   |   33 +++++++--
 .../apache/cassandra/locator/TokenMetadata.java    |   52 +++++++++++++
 .../org/apache/cassandra/net/MessagingService.java |    3 +-
 .../apache/cassandra/service/StorageService.java   |   56 +++++++++++++-
 .../cassandra/service/StorageServiceMBean.java     |    7 ++
 src/java/org/apache/cassandra/tools/NodeCmd.java   |   18 +++++
 src/java/org/apache/cassandra/tools/NodeProbe.java |   10 +++
 test/unit/org/apache/cassandra/Util.java           |   12 +++-
 .../org/apache/cassandra/dht/BootStrapperTest.java |   20 +++++-
 .../apache/cassandra/gms/SerializationsTest.java   |    3 +-
 .../cassandra/service/LeaveAndBootstrapTest.java   |   43 +++++++-----
 .../org/apache/cassandra/service/MoveTest.java     |   21 +++---
 .../org/apache/cassandra/service/RemoveTest.java   |    4 +-
 13 files changed, 238 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java 
b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 36ff1d9..25225c5 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -24,7 +24,9 @@ import java.util.UUID;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.lang.StringUtils;
 
 
 /**
@@ -88,6 +90,11 @@ public class VersionedValue implements 
Comparable<VersionedValue>
         return "Value(" + value + "," + version + ")";
     }
 
+    private static String versionString(String...args)
+    {
+        return StringUtils.join(args, VersionedValue.DELIMITER);
+    }
+
     public static class VersionedValueFactory
     {
         final IPartitioner partitioner;
@@ -97,14 +104,18 @@ public class VersionedValue implements 
Comparable<VersionedValue>
             this.partitioner = partitioner;
         }
 
-        public VersionedValue bootstrapping(Token token)
+        public VersionedValue bootstrapping(Token token, UUID hostId)
         {
-            return new VersionedValue(VersionedValue.STATUS_BOOTSTRAPPING + 
VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+            return new 
VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING,
+                                                    hostId.toString(),
+                                                    
partitioner.getTokenFactory().toString(token)));
         }
 
-        public VersionedValue normal(Token token)
+        public VersionedValue normal(Token token, UUID hostId)
         {
-            return new VersionedValue(VersionedValue.STATUS_NORMAL + 
VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+            return new 
VersionedValue(versionString(VersionedValue.STATUS_NORMAL,
+                                                    hostId.toString(),
+                                                    
partitioner.getTokenFactory().toString(token)));
         }
 
         public VersionedValue load(double load)
@@ -189,7 +200,19 @@ public class VersionedValue implements 
Comparable<VersionedValue>
     {
         public void serialize(VersionedValue value, DataOutput dos, int 
version) throws IOException
         {
-            dos.writeUTF(value.value);
+            String outValue = value.value;
+
+            if (version < MessagingService.VERSION_12)
+            {
+                String[] pieces = value.value.split(DELIMITER_STR, -1);
+                if ((pieces[0] == STATUS_NORMAL) || pieces[0] == 
STATUS_BOOTSTRAPPING)
+                {
+                    assert pieces.length >= 3;
+                    outValue = versionString(pieces[0], pieces[2]);
+                }
+            }
+
+            dos.writeUTF(outValue);
             dos.writeInt(value.version);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java 
b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 7e72eb4..1cb2a61 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.service.StorageService;
 
 public class TokenMetadata
@@ -42,6 +43,9 @@ public class TokenMetadata
     /* Maintains token to endpoint map of every node in the cluster. */
     private final BiMap<Token, InetAddress> tokenToEndpointMap;
 
+    /* Maintains endpoint to host ID map of every node in the cluster */
+    private final BiMap<InetAddress, UUID> endpointToHostIdMap;
+
     // Prior to CASSANDRA-603, we just had <tt>Map<Range, InetAddress> 
pendingRanges<tt>,
     // which was added to when a node began bootstrap and removed from when it 
finished.
     //
@@ -93,6 +97,7 @@ public class TokenMetadata
         if (tokenToEndpointMap == null)
             tokenToEndpointMap = HashBiMap.create();
         this.tokenToEndpointMap = tokenToEndpointMap;
+        endpointToHostIdMap = HashBiMap.create();
         sortedTokens = sortTokens();
     }
 
@@ -172,6 +177,51 @@ public class TokenMetadata
         }
     }
 
+    /**
+     * Store an end-point to host ID mapping.  Each ID must be unique, and
+     * cannot be changed after the fact.
+     *
+     * @param hostId
+     * @param endpoint
+     */
+    public void updateHostId(UUID hostId, InetAddress endpoint)
+    {
+        assert hostId != null;
+        assert endpoint != null;
+
+        InetAddress storedEp = endpointToHostIdMap.inverse().get(hostId);
+        if (storedEp != null)
+        {
+            if (!storedEp.equals(endpoint) && 
(FailureDetector.instance.isAlive(storedEp)))
+            {
+                throw new RuntimeException(String.format("Host ID collision 
between active endpoint %s and %s (id=%s)",
+                                                         storedEp,
+                                                         endpoint,
+                                                         hostId));
+            }
+        }
+
+        UUID storedId = endpointToHostIdMap.get(endpoint);
+        if ((storedId != null) && (!storedId.equals(hostId)))
+            logger.warn("Changing {}'s host ID from {} to {}", new Object[] 
{endpoint, storedId, hostId});
+
+        endpointToHostIdMap.forcePut(endpoint, hostId);
+    }
+
+    /** Return the unique host ID for an end-point. */
+    public UUID getHostId(InetAddress endpoint)
+    {
+        return endpointToHostIdMap.get(endpoint);
+    }
+
+    /** @return a copy of the endpoint-to-id map for read-only operations */
+    public Map<InetAddress, UUID> getEndpointToHostIdMapForReading()
+    {
+        Map<InetAddress, UUID> readMap = new HashMap<InetAddress, UUID>();
+        readMap.putAll(endpointToHostIdMap);
+        return readMap;
+    }
+
     public void addBootstrapToken(Token token, InetAddress endpoint)
     {
         assert token != null;
@@ -260,6 +310,7 @@ public class TokenMetadata
             bootstrapTokens.inverse().remove(endpoint);
             tokenToEndpointMap.inverse().remove(endpoint);
             leavingEndpoints.remove(endpoint);
+            endpointToHostIdMap.remove(endpoint);
             sortedTokens = sortTokens();
             invalidateCaches();
         }
@@ -607,6 +658,7 @@ public class TokenMetadata
         tokenToEndpointMap.clear();
         leavingEndpoints.clear();
         pendingRanges.clear();
+        endpointToHostIdMap.clear();
         invalidateCaches();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 96fac99..c9a928e 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -70,7 +70,8 @@ public final class MessagingService implements 
MessagingServiceMBean
     public static final int VERSION_080 = 2;
     public static final int VERSION_10 = 3;
     public static final int VERSION_11 = 4;
-    public static final int current_version = VERSION_11;
+    public static final int VERSION_12 = 5;
+    public static final int current_version = VERSION_12;
 
     static SerializerType serializerType = SerializerType.BINARY;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 6b195c5..64ec4b6 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -252,7 +252,8 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
             logger.debug("Setting token to {}", token);
         SystemTable.updateToken(token);
         tokenMetadata.updateNormalToken(token, 
FBUtilities.getBroadcastAddress());
-        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, 
valueFactory.normal(getLocalToken()));
+        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS,
+                                                   
valueFactory.normal(getLocalToken(), SystemTable.getLocalHostId()));
         setMode(Mode.NORMAL, false);
     }
 
@@ -533,6 +534,8 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         logger.info("Starting up server gossip");
         joined = true;
 
+        // Seed the host ID-to-endpoint map with our own ID.
+        getTokenMetadata().updateHostId(SystemTable.getLocalHostId(), 
FBUtilities.getBroadcastAddress());
 
         // have to start the gossip service before we can see any info on 
other nodes.  this is necessary
         // for bootstrap to get the load info it needs.
@@ -759,7 +762,8 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         if (null == DatabaseDescriptor.getReplaceToken())
         {
             // if not an existing token then bootstrap
-            
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, 
valueFactory.bootstrapping(token));
+            Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS,
+                                                       
valueFactory.bootstrapping(token, SystemTable.getLocalHostId()));
             setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending 
range setup", true);
             try
             {
@@ -972,6 +976,19 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         return mapString;
     }
 
+    public String getLocalHostId()
+    {
+        return 
getTokenMetadata().getHostId(FBUtilities.getBroadcastAddress()).toString();
+    }
+
+    public Map<String, String> getHostIdMap()
+    {
+        Map<String, String> mapOut = new HashMap<String, String>();
+        for (Map.Entry<InetAddress, UUID> entry : 
getTokenMetadata().getEndpointToHostIdMapForReading().entrySet())
+            mapOut.put(entry.getKey().getHostAddress(), 
entry.getValue().toString());
+        return mapOut;
+    }
+
     /**
      * Construct the range to endpoint mapping based on the true view
      * of the world.
@@ -1074,7 +1091,19 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
     private void handleStateBootstrap(InetAddress endpoint, String[] pieces)
     {
         assert pieces.length >= 2;
-        Token token = getPartitioner().getTokenFactory().fromString(pieces[1]);
+
+        // Parse versioned values according to end-point version:
+        //   versions  < 1.2 .....: STATUS,TOKEN
+        //   versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,...
+        int tokenPos;
+        if (Gossiper.instance.getVersion(endpoint) >= 
MessagingService.VERSION_12)
+        {
+            assert pieces.length >= 3;
+            tokenPos = 2;
+        }
+            else tokenPos = 1;
+
+        Token token = 
getPartitioner().getTokenFactory().fromString(pieces[tokenPos]);
 
         if (logger.isDebugEnabled())
             logger.debug("Node " + endpoint + " state bootstrapping, token " + 
token);
@@ -1096,6 +1125,9 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
 
         tokenMetadata.addBootstrapToken(token, endpoint);
         calculatePendingRanges();
+
+        if (Gossiper.instance.getVersion(endpoint) >= 
MessagingService.VERSION_12)
+            tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint);
     }
 
     /**
@@ -1108,7 +1140,20 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
     private void handleStateNormal(InetAddress endpoint, String[] pieces)
     {
         assert pieces.length >= 2;
-        Token token = getPartitioner().getTokenFactory().fromString(pieces[1]);
+
+        // Parse versioned values according to end-point version:
+        //   versions  < 1.2 .....: STATUS,TOKEN
+        //   versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,...
+        int tokensPos;
+        if (Gossiper.instance.getVersion(endpoint) >= 
MessagingService.VERSION_12)
+        {
+            assert pieces.length >= 3;
+            tokensPos = 2;
+        }
+        else
+            tokensPos = 1;
+
+        Token token = 
getPartitioner().getTokenFactory().fromString(pieces[tokensPos]);
 
         if (logger.isDebugEnabled())
             logger.debug("Node " + endpoint + " state normal, token " + token);
@@ -1150,6 +1195,9 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
             tokenMetadata.removeFromMoving(endpoint);
 
         calculatePendingRanges();
+
+        if (Gossiper.instance.getVersion(endpoint) >= 
MessagingService.VERSION_12)
+            tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 12f714c..2af4215 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
@@ -154,6 +155,12 @@ public interface StorageServiceMBean
      */
     public Map<String, String> getTokenToEndpointMap();
 
+    /** Retrieve this hosts unique ID */
+    public String getLocalHostId();
+
+    /** Retrieve the mapping of endpoint to host ID */
+    public Map<String, String> getHostIdMap();
+
     /**
      * Numeric load value.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java 
b/src/java/org/apache/cassandra/tools/NodeCmd.java
index c7ecd7d..4f2fd8b 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -94,6 +94,7 @@ public class NodeCmd
         GETCOMPACTIONTHRESHOLD,
         GETENDPOINTS,
         GOSSIPINFO,
+        IDS,
         INFO,
         INVALIDATEKEYCACHE,
         INVALIDATEROWCACHE,
@@ -137,6 +138,7 @@ public class NodeCmd
         addCmdHelp(header, "join", "Join the ring");
         addCmdHelp(header, "info", "Print node informations (uptime, load, 
...)");
         addCmdHelp(header, "cfstats", "Print statistics on column families");
+        addCmdHelp(header, "ids", "Print list of unique host IDs");
         addCmdHelp(header, "version", "Print cassandra version");
         addCmdHelp(header, "tpstats", "Print usage statistics of thread 
pools");
         addCmdHelp(header, "proxyhistograms", "Print statistic histograms for 
network operations");
@@ -282,6 +284,20 @@ public class NodeCmd
         }
     }
 
+    /** Writes a table of host IDs to a PrintStream */
+    public void printHostIds(PrintStream outs)
+    {
+        System.out.print(String.format("%-16s %-7s %s%n", "Address", "Status", 
"Host ID"));
+        for (Map.Entry<String, String> entry : probe.getHostIdMap().entrySet())
+        {
+            String status;
+            if      (probe.getLiveNodes().contains(entry.getKey()))        
status = "Up";
+            else if (probe.getUnreachableNodes().contains(entry.getKey())) 
status = "Down";
+            else                                                           
status = "?";
+            System.out.print(String.format("%-16s %-7s %s%n", entry.getKey(), 
status, entry.getValue()));
+        }
+    }
+
     public void printThreadPoolStats(PrintStream outs)
     {
         outs.printf("%-25s%10s%10s%15s%10s%18s%n", "Pool Name", "Active", 
"Pending", "Completed", "Blocked", "All time blocked");
@@ -315,6 +331,7 @@ public class NodeCmd
     {
         boolean gossipInitialized = probe.isInitialized();
         outs.printf("%-17s: %s%n", "Token", probe.getToken());
+        outs.printf("%-17s: %s%n", "ID", probe.getLocalHostId());
         outs.printf("%-17s: %s%n", "Gossip active", gossipInitialized);
         outs.printf("%-17s: %s%n", "Load", probe.getLoadString());
         if (gossipInitialized)
@@ -720,6 +737,7 @@ public class NodeCmd
                 case ENABLETHRIFT    : probe.startThriftServer(); break;
                 case STATUSTHRIFT    : 
nodeCmd.printIsThriftServerRunning(System.out); break;
                 case RESETLOCALSCHEMA: probe.resetLocalSchema(); break;
+                case IDS             : nodeCmd.printHostIds(System.out); break;
 
                 case DRAIN :
                     try { probe.drain(); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 5dad98d..e342cf2 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -313,6 +313,16 @@ public class NodeProbe
         return ssProxy.getToken();
     }
 
+    public String getLocalHostId()
+    {
+        return ssProxy.getLocalHostId();
+    }
+
+    public Map<String, String> getHostIdMap()
+    {
+        return ssProxy.getHostIdMap();
+    }
+
     public String getLoadString()
     {
         return ssProxy.getLoadString();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java 
b/test/unit/org/apache/cassandra/Util.java
index fd5259a..a55787c 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -50,6 +51,8 @@ import static org.junit.Assert.assertTrue;
 
 public class Util
 {
+    private static List<UUID> hostIdPool = new ArrayList<UUID>();
+
     public static DecoratedKey dk(String key)
     {
         return 
StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(key));
@@ -208,19 +211,24 @@ public class Util
      * Creates initial set of nodes and tokens. Nodes are added to 
StorageService as 'normal'
      */
     public static void createInitialRing(StorageService ss, IPartitioner 
partitioner, List<Token> endpointTokens,
-                                   List<Token> keyTokens, List<InetAddress> 
hosts, int howMany)
+                                   List<Token> keyTokens, List<InetAddress> 
hosts, List<UUID> hostIds, int howMany)
         throws UnknownHostException
     {
+        // Expand pool of host IDs as necessary
+        for (int i = hostIdPool.size(); i < howMany; i++)
+            hostIdPool.add(UUID.randomUUID());
+
         for (int i=0; i<howMany; i++)
         {
             endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
             keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5)));
+            hostIds.add(hostIdPool.get(i));
         }
 
         for (int i=0; i<endpointTokens.size(); i++)
         {
             InetAddress ep = InetAddress.getByName("127.0.0." + 
String.valueOf(i + 1));
-            ss.onChange(ep, ApplicationState.STATUS, new 
VersionedValue.VersionedValueFactory(partitioner).normal(endpointTokens.get(i)));
+            ss.onChange(ep, ApplicationState.STATUS, new 
VersionedValue.VersionedValueFactory(partitioner).normal(endpointTokens.get(i), 
hostIds.get(i)));
             hosts.add(ep);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java 
b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 56eba35..6e9236f 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.HashMap;
 import java.util.Set;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
 import org.junit.Test;
@@ -75,6 +76,13 @@ public class BootStrapperTest extends SchemaLoader
             InetAddress.getByName("127.0.0.14"),
             InetAddress.getByName("127.0.0.15"),
         };
+        UUID[] bootstrapHostIds = new UUID[]
+        {
+            UUID.randomUUID(),
+            UUID.randomUUID(),
+            UUID.randomUUID(),
+            UUID.randomUUID(),
+        };
         Map<InetAddress, Double> load = new HashMap<InetAddress, Double>();
         for (int i = 0; i < addrs.length; i++)
         {
@@ -93,7 +101,9 @@ public class BootStrapperTest extends SchemaLoader
             Range<Token> range = 
ss.getPrimaryRangeForEndpoint(bootstrapSource);
             Token token = StorageService.getPartitioner().midpoint(range.left, 
range.right);
             assert range.contains(token);
-            ss.onChange(bootstrapAddrs[i], ApplicationState.STATUS, 
StorageService.instance.valueFactory.bootstrapping(token));
+            ss.onChange(bootstrapAddrs[i],
+                        ApplicationState.STATUS,
+                        
StorageService.instance.valueFactory.bootstrapping(token, bootstrapHostIds[i]));
         }
 
         // any further attempt to bootsrtap should fail since every node in 
the cluster is splitting.
@@ -110,7 +120,9 @@ public class BootStrapperTest extends SchemaLoader
         // indicate that one of the nodes is done. see if the node it was 
bootstrapping from is still available.
         Range<Token> range = ss.getPrimaryRangeForEndpoint(addrs[2]);
         Token token = StorageService.getPartitioner().midpoint(range.left, 
range.right);
-        ss.onChange(bootstrapAddrs[2], ApplicationState.STATUS, 
StorageService.instance.valueFactory.normal(token));
+        ss.onChange(bootstrapAddrs[2],
+                    ApplicationState.STATUS,
+                    StorageService.instance.valueFactory.normal(token, 
bootstrapHostIds[2]));
         load.put(bootstrapAddrs[2], 0d);
         InetAddress addr = 
BootStrapper.getBootstrapSource(ss.getTokenMetadata(), load);
         assert addr != null && addr.equals(addrs[2]);
@@ -142,7 +154,9 @@ public class BootStrapperTest extends SchemaLoader
         Range<Token> range5 = ss.getPrimaryRangeForEndpoint(five);
         Token fakeToken = 
StorageService.getPartitioner().midpoint(range5.left, range5.right);
         assert range5.contains(fakeToken);
-        ss.onChange(myEndpoint, ApplicationState.STATUS, 
StorageService.instance.valueFactory.bootstrapping(fakeToken));
+        ss.onChange(myEndpoint,
+                    ApplicationState.STATUS,
+                    
StorageService.instance.valueFactory.bootstrapping(fakeToken, 
UUID.randomUUID()));
         tmd = ss.getTokenMetadata();
 
         InetAddress source4 = BootStrapper.getBootstrapSource(tmd, load);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/test/unit/org/apache/cassandra/gms/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java 
b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index 0010465..4598f71 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 public class SerializationsTest extends AbstractSerializationsTester
 {
@@ -101,7 +102,7 @@ public class SerializationsTest extends 
AbstractSerializationsTester
         private static EndpointState EndpointSt = new 
EndpointState(HeartbeatSt);
         private static VersionedValue.VersionedValueFactory vvFact = new 
VersionedValue.VersionedValueFactory(StorageService.getPartitioner());
         private static VersionedValue vv0 = vvFact.load(23d);
-        private static VersionedValue vv1 = 
vvFact.bootstrapping(StorageService.getPartitioner().getRandomToken());
+        private static VersionedValue vv1 = 
vvFact.bootstrapping(StorageService.getPartitioner().getRandomToken(), 
UUID.randomUUID());
         private static List<GossipDigest> Digests = new 
ArrayList<GossipDigest>();
 
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java 
b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index faa9e18..be53b53 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -82,8 +82,9 @@ public class LeaveAndBootstrapTest
         ArrayList<Token> endpointTokens = new ArrayList<Token>();
         ArrayList<Token> keyTokens = new ArrayList<Token>();
         List<InetAddress> hosts = new ArrayList<InetAddress>();
+        List<UUID> hostIds = new ArrayList<UUID>();
 
-        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, RING_SIZE);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, RING_SIZE);
 
         Map<Token, List<InetAddress>> expectedEndpoints = new HashMap<Token, 
List<InetAddress>>();
         for (String table : Schema.instance.getNonSystemTables())
@@ -149,9 +150,10 @@ public class LeaveAndBootstrapTest
         ArrayList<Token> endpointTokens = new ArrayList<Token>();
         ArrayList<Token> keyTokens = new ArrayList<Token>();
         List<InetAddress> hosts = new ArrayList<InetAddress>();
+        List<UUID> hostIds = new ArrayList<UUID>();
 
         // create a ring or 10 nodes
-        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, RING_SIZE);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, RING_SIZE);
 
         // nodes 6, 8 and 9 leave
         final int[] LEAVING = new int[] {6, 8, 9};
@@ -160,9 +162,10 @@ public class LeaveAndBootstrapTest
 
         // boot two new nodes with keyTokens.get(5) and keyTokens.get(7)
         InetAddress boot1 = InetAddress.getByName("127.0.1.1");
-        ss.onChange(boot1, ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(5)));
+        UUID boot1Id = UUID.randomUUID();
+        ss.onChange(boot1, ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(5), boot1Id));
         InetAddress boot2 = InetAddress.getByName("127.0.1.2");
-        ss.onChange(boot2, ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(7)));
+        ss.onChange(boot2, ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(7), UUID.randomUUID()));
 
         Collection<InetAddress> endpoints = null;
 
@@ -318,7 +321,7 @@ public class LeaveAndBootstrapTest
                 valueFactory.left(endpointTokens.get(LEAVING[0]), 
Gossiper.computeExpireTime()));
         ss.onChange(hosts.get(LEAVING[2]), ApplicationState.STATUS,
                 valueFactory.left(endpointTokens.get(LEAVING[2]), 
Gossiper.computeExpireTime()));
-        ss.onChange(boot1, ApplicationState.STATUS, 
valueFactory.normal(keyTokens.get(5)));
+        ss.onChange(boot1, ApplicationState.STATUS, 
valueFactory.normal(keyTokens.get(5), boot1Id));
 
         // adjust precalcuated results.  this changes what the epected 
endpoints are.
         expectedEndpoints.get("Keyspace1").get(new 
BigIntegerToken("55")).removeAll(makeAddrs("127.0.0.7", "127.0.0.8"));
@@ -434,9 +437,10 @@ public class LeaveAndBootstrapTest
         ArrayList<Token> endpointTokens = new ArrayList<Token>();
         ArrayList<Token> keyTokens = new ArrayList<Token>();
         List<InetAddress> hosts = new ArrayList<InetAddress>();
+        List<UUID> hostIds = new ArrayList<UUID>();
 
         // create a ring or 5 nodes
-        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, 7);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, 7);
 
         // node 2 leaves
         ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.leaving(endpointTokens.get(2)));
@@ -448,14 +452,14 @@ public class LeaveAndBootstrapTest
         assertTrue(tmd.getBootstrapTokens().isEmpty());
 
         // Bootstrap the node immedidiately to keyTokens.get(4) without going 
through STATE_LEFT
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(4)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(4), hostIds.get(2)));
 
         assertFalse(tmd.isMember(hosts.get(2)));
         assertFalse(tmd.isLeaving(hosts.get(2)));
         
assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(4)).equals(hosts.get(2)));
 
         // Bootstrap node hosts.get(3) to keyTokens.get(1)
-        ss.onChange(hosts.get(3), ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(1)));
+        ss.onChange(hosts.get(3), ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(1), hostIds.get(3)));
 
         assertFalse(tmd.isMember(hosts.get(3)));
         assertFalse(tmd.isLeaving(hosts.get(3)));
@@ -463,7 +467,7 @@ public class LeaveAndBootstrapTest
         
assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3)));
 
         // Bootstrap node hosts.get(2) further to keyTokens.get(3)
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(3)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(3), hostIds.get(2)));
 
         assertFalse(tmd.isMember(hosts.get(2)));
         assertFalse(tmd.isLeaving(hosts.get(2)));
@@ -472,8 +476,8 @@ public class LeaveAndBootstrapTest
         
assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3)));
 
         // Go to normal again for both nodes
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.normal(keyTokens.get(3)));
-        ss.onChange(hosts.get(3), ApplicationState.STATUS, 
valueFactory.normal(keyTokens.get(2)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.normal(keyTokens.get(3), hostIds.get(2)));
+        ss.onChange(hosts.get(3), ApplicationState.STATUS, 
valueFactory.normal(keyTokens.get(2), hostIds.get(3)));
 
         assertTrue(tmd.isMember(hosts.get(2)));
         assertFalse(tmd.isLeaving(hosts.get(2)));
@@ -497,9 +501,10 @@ public class LeaveAndBootstrapTest
         ArrayList<Token> endpointTokens = new ArrayList<Token>();
         ArrayList<Token> keyTokens = new ArrayList<Token>();
         List<InetAddress> hosts = new ArrayList<InetAddress>();
+        List<UUID> hostIds = new ArrayList<UUID>();
 
         // create a ring or 5 nodes
-        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, 6);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, 6);
 
         // node 2 leaves
         ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.leaving(endpointTokens.get(2)));
@@ -508,7 +513,7 @@ public class LeaveAndBootstrapTest
         assertTrue(tmd.getToken(hosts.get(2)).equals(endpointTokens.get(2)));
 
         // back to normal
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.normal(keyTokens.get(2)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.normal(keyTokens.get(2), hostIds.get(2)));
 
         assertTrue(tmd.getLeavingEndpoints().isEmpty());
         assertTrue(tmd.getToken(hosts.get(2)).equals(keyTokens.get(2)));
@@ -517,7 +522,7 @@ public class LeaveAndBootstrapTest
         ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.leaving(keyTokens.get(2)));
         ss.onChange(hosts.get(2), ApplicationState.STATUS,
                 valueFactory.left(keyTokens.get(2), 
Gossiper.computeExpireTime()));
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.normal(keyTokens.get(4)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.normal(keyTokens.get(4), hostIds.get(2)));
 
         assertTrue(tmd.getBootstrapTokens().isEmpty());
         assertTrue(tmd.getLeavingEndpoints().isEmpty());
@@ -536,9 +541,10 @@ public class LeaveAndBootstrapTest
         ArrayList<Token> endpointTokens = new ArrayList<Token>();
         ArrayList<Token> keyTokens = new ArrayList<Token>();
         List<InetAddress> hosts = new ArrayList<InetAddress>();
+        List<UUID> hostIds = new ArrayList<UUID>();
 
         // create a ring or 5 nodes
-        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, 6);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, 6);
 
         // node 2 leaves with _different_ token
         ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.leaving(keyTokens.get(0)));
@@ -548,7 +554,7 @@ public class LeaveAndBootstrapTest
         assertTrue(tmd.getEndpoint(endpointTokens.get(2)) == null);
 
         // go to boostrap
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(1)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(1), hostIds.get(2)));
 
         assertFalse(tmd.isLeaving(hosts.get(2)));
         assertTrue(tmd.getBootstrapTokens().size() == 1);
@@ -581,9 +587,10 @@ public class LeaveAndBootstrapTest
         ArrayList<Token> endpointTokens = new ArrayList<Token>();
         ArrayList<Token> keyTokens = new ArrayList<Token>();
         List<InetAddress> hosts = new ArrayList<InetAddress>();
+        List<UUID> hostIds = new ArrayList<UUID>();
 
         // create a ring of 6 nodes
-        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, 7);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, 7);
 
         // node hosts.get(2) goes jumps to left
         ss.onChange(hosts.get(2), ApplicationState.STATUS,
@@ -592,7 +599,7 @@ public class LeaveAndBootstrapTest
         assertFalse(tmd.isMember(hosts.get(2)));
 
         // node hosts.get(4) goes to bootstrap
-        ss.onChange(hosts.get(3), ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(1)));
+        ss.onChange(hosts.get(3), ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(1), hostIds.get(3)));
 
         assertFalse(tmd.isMember(hosts.get(3)));
         assertTrue(tmd.getBootstrapTokens().size() == 1);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/test/unit/org/apache/cassandra/service/MoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java 
b/test/unit/org/apache/cassandra/service/MoveTest.java
index cbdad15..b44fa32 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -86,8 +86,9 @@ public class MoveTest
         ArrayList<Token> endpointTokens = new ArrayList<Token>();
         ArrayList<Token> keyTokens = new ArrayList<Token>();
         List<InetAddress> hosts = new ArrayList<InetAddress>();
+        List<UUID> hostIds = new ArrayList<UUID>();
 
-        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, RING_SIZE);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, RING_SIZE);
 
         Map<Token, List<InetAddress>> expectedEndpoints = new HashMap<Token, 
List<InetAddress>>();
         for (String table : Schema.instance.getNonSystemTables())
@@ -133,7 +134,7 @@ public class MoveTest
         }
 
         // moving endpoint back to the normal state
-        ss.onChange(hosts.get(MOVING_NODE), ApplicationState.STATUS, 
valueFactory.normal(newToken));
+        ss.onChange(hosts.get(MOVING_NODE), ApplicationState.STATUS, 
valueFactory.normal(newToken, hostIds.get(MOVING_NODE)));
     }
 
     /*
@@ -152,9 +153,10 @@ public class MoveTest
         ArrayList<Token> endpointTokens = new ArrayList<Token>();
         ArrayList<Token> keyTokens = new ArrayList<Token>();
         List<InetAddress> hosts = new ArrayList<InetAddress>();
+        List<UUID> hostIds = new ArrayList<UUID>();
 
         // create a ring or 10 nodes
-        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, RING_SIZE);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, RING_SIZE);
 
         // nodes 6, 8 and 9 leave
         final int[] MOVING = new int[] {6, 8, 9};
@@ -177,9 +179,9 @@ public class MoveTest
 
         // boot two new nodes with keyTokens.get(5) and keyTokens.get(7)
         InetAddress boot1 = InetAddress.getByName("127.0.1.1");
-        ss.onChange(boot1, ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(5)));
+        ss.onChange(boot1, ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(5), UUID.randomUUID()));
         InetAddress boot2 = InetAddress.getByName("127.0.1.2");
-        ss.onChange(boot2, ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(7)));
+        ss.onChange(boot2, ApplicationState.STATUS, 
valueFactory.bootstrapping(keyTokens.get(7), UUID.randomUUID()));
 
         // don't require test update every time a new keyspace is added to 
test/conf/cassandra.yaml
         Map<String, AbstractReplicationStrategy> tableStrategyMap = new 
HashMap<String, AbstractReplicationStrategy>();
@@ -465,7 +467,7 @@ public class MoveTest
         // all moving nodes are back to the normal state
         for (Integer movingIndex : MOVING)
         {
-            ss.onChange(hosts.get(movingIndex), ApplicationState.STATUS, 
valueFactory.normal(newTokens.get(movingIndex)));
+            ss.onChange(hosts.get(movingIndex), ApplicationState.STATUS, 
valueFactory.normal(newTokens.get(movingIndex), hostIds.get(movingIndex)));
         }
     }
 
@@ -481,9 +483,10 @@ public class MoveTest
         ArrayList<Token> endpointTokens = new ArrayList<Token>();
         ArrayList<Token> keyTokens = new ArrayList<Token>();
         List<InetAddress> hosts = new ArrayList<InetAddress>();
+        List<UUID> hostIds = new ArrayList<UUID>();
 
         // create a ring or 6 nodes
-        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, 6);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, 6);
 
         // node 2 leaves
         Token newToken = positionToken(7);
@@ -493,7 +496,7 @@ public class MoveTest
         assertTrue(tmd.getToken(hosts.get(2)).equals(endpointTokens.get(2)));
 
         // back to normal
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.normal(newToken));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.normal(newToken, hostIds.get(2)));
 
         assertTrue(tmd.getMovingEndpoints().isEmpty());
         assertTrue(tmd.getToken(hosts.get(2)).equals(newToken));
@@ -501,7 +504,7 @@ public class MoveTest
         newToken = positionToken(8);
         // node 2 goes through leave and left and then jumps to normal at its 
new token
         ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.moving(newToken));
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.normal(newToken));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.normal(newToken, hostIds.get(2)));
 
         assertTrue(tmd.getBootstrapTokens().isEmpty());
         assertTrue(tmd.getMovingEndpoints().isEmpty());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ad685c46/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java 
b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 910267a..e56e291 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.After;
@@ -57,6 +58,7 @@ public class RemoveTest
     ArrayList<Token> endpointTokens = new ArrayList<Token>();
     ArrayList<Token> keyTokens = new ArrayList<Token>();
     List<InetAddress> hosts = new ArrayList<InetAddress>();
+    List<UUID> hostIds = new ArrayList<UUID>();
     InetAddress removalhost;
     Token removaltoken;
 
@@ -80,7 +82,7 @@ public class RemoveTest
         tmd.clearUnsafe();
 
         // create a ring of 5 nodes
-        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, 6);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, 6);
 
         MessagingService.instance().listen(FBUtilities.getBroadcastAddress());
         Gossiper.instance.start(1);

Reply via email to