Updated Branches:
  refs/heads/trunk bbcbfd865 -> e6530cc37

Gossip protocol version, use it to determine if new host id should be
used.
Patch by brandonwilliams, reviewed by Vijay for CASSANDRA-4317


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e6530cc3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e6530cc3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e6530cc3

Branch: refs/heads/trunk
Commit: e6530cc3723a7d2fdc84400bf8cd722474eed589
Parents: bbcbfd8
Author: Brandon Williams <[email protected]>
Authored: Tue Jun 19 21:10:38 2012 -0500
Committer: Brandon Williams <[email protected]>
Committed: Tue Jun 19 21:10:38 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/gms/ApplicationState.java |    1 +
 .../org/apache/cassandra/gms/VersionedValue.java   |    7 ++++
 .../org/apache/cassandra/net/MessagingService.java |    5 +++
 .../apache/cassandra/service/StorageService.java   |   26 ++++++++++++--
 4 files changed, 35 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/gms/ApplicationState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java 
b/src/java/org/apache/cassandra/gms/ApplicationState.java
index 4520426..518aa80 100644
--- a/src/java/org/apache/cassandra/gms/ApplicationState.java
+++ b/src/java/org/apache/cassandra/gms/ApplicationState.java
@@ -29,6 +29,7 @@ public enum ApplicationState
     INTERNAL_IP,
     RPC_ADDRESS,
     SEVERITY,
+    NET_VERSION,
     // pad to allow adding new states to existing cluster
     X1,
     X2,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java 
b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 61bcbe5..ff41fcb 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -64,6 +64,8 @@ public class VersionedValue implements 
Comparable<VersionedValue>
 
     // values for ApplicationState.REMOVAL_COORDINATOR
     public final static String REMOVAL_COORDINATOR = "REMOVER";
+    // network proto version from MS
+    public final static String NET_VERSION = "NET_VERSION";
 
     public final int version;
     public final String value;
@@ -184,6 +186,11 @@ public class VersionedValue implements 
Comparable<VersionedValue>
         {
             return new VersionedValue(FBUtilities.getReleaseVersionString());
         }
+        
+        public VersionedValue networkVersion()
+        {
+            return new 
VersionedValue(String.valueOf(MessagingService.current_version));
+        }
 
         public VersionedValue internalIP(String private_ip)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 9c92402..c0af377 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -798,6 +798,11 @@ public final class MessagingService implements 
MessagingServiceMBean
         return getVersion(InetAddress.getByName(address));
     }
 
+    public boolean knowsVersion(InetAddress endpoint)
+    {
+        return versions.get(endpoint) != null;
+    }
+
     public void incrementDroppedMessages(Verb verb)
     {
         assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not 
legally be dropped";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e6530cc3/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 0455b1d..012f2ec 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -318,6 +318,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         setMode(Mode.CLIENT, false);
         Gossiper.instance.register(this);
         Gossiper.instance.start((int)(System.currentTimeMillis() / 1000)); // 
needed for node-ring gathering.
+        
Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, 
valueFactory.networkVersion());
         MessagingService.instance().listen(FBUtilities.getLocalAddress());
 
         // sleep a while to allow gossip to warm up (the other nodes need to 
know about this one before they can reply).
@@ -465,6 +466,8 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         Gossiper.instance.register(this);
         Gossiper.instance.register(migrationManager);
         Gossiper.instance.start(SystemTable.incrementAndGetGeneration()); // 
needed for node-ring gathering.
+        // gossip network proto version
+        
Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, 
valueFactory.networkVersion());
         // gossip schema version when gossiper is running
         Schema.instance.updateVersionAndAnnounce();
         // add rpc listening info
@@ -1005,6 +1008,21 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
     }
 
     /**
+     * Checks MS for the version, provided MS _really_ knows it (has directly 
communicated with the node) otherwise falls back to checking the gossipped 
version (learned about this node indirectly)
+     * If both fail, the node is too old to use hostid-style status 
serialization
+     * @param endpoint
+     * @return boolean whether or not to use hostid
+     */
+    private boolean usesHostId(InetAddress endpoint)
+    {
+        if (MessagingService.instance().knowsVersion(endpoint) && 
MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12)
+            return true;
+        else if 
(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION)
 != null && 
Integer.valueOf(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION).value)
 >= MessagingService.VERSION_12)
+            return true;
+        return false;
+    }
+
+    /**
      * Handle node bootstrap
      *
      * @param endpoint bootstrapping node
@@ -1018,7 +1036,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         //   versions  < 1.2 .....: STATUS,TOKEN
         //   versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,...
         int tokenPos;
-        if (MessagingService.instance().getVersion(endpoint) >= 
MessagingService.VERSION_12)
+        if (usesHostId(endpoint))
         {
             assert pieces.length >= 3;
             tokenPos = 2;
@@ -1048,7 +1066,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         tokenMetadata.addBootstrapToken(token, endpoint);
         calculatePendingRanges();
 
-        if (MessagingService.instance().getVersion(endpoint) >= 
MessagingService.VERSION_12)
+        if (usesHostId(endpoint))
             tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint);
     }
 
@@ -1067,7 +1085,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         //   versions  < 1.2 .....: STATUS,TOKEN
         //   versions >= 1.2 .....: STATUS,HOST_ID,TOKEN,TOKEN,...
         int tokensPos;
-        if (MessagingService.instance().getVersion(endpoint) >= 
MessagingService.VERSION_12)
+        if (usesHostId(endpoint))
         {
             assert pieces.length >= 3;
             tokensPos = 2;
@@ -1084,7 +1102,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
             logger.info("Node " + endpoint + " state jump to normal");
 
         // Order Matters, TM.updateHostID() should be called before 
TM.updateNormalToken(), (see CASSANDRA-4300).
-        if (MessagingService.instance().getVersion(endpoint) >= 
MessagingService.VERSION_12)
+        if (usesHostId(endpoint))
             tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint);
 
         // we don't want to update if this node is responsible for the token 
and it has a later startup time than endpoint.

Reply via email to