Author: brandonwilliams
Date: Wed Oct 12 16:17:02 2011
New Revision: 1182457

URL: http://svn.apache.org/viewvc?rev=1182457&view=rev
Log:
Expire dead gossip states based on time.
Patch by Jérémy Sevellec, reviewed by Paul Cannon and brandonwilliams
for CASSANDRA-2961

Modified:
    
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/Gossiper.java
    
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/VersionedValue.java
    
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java

Modified: 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/Gossiper.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1182457&r1=1182456&r2=1182457&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/Gossiper.java
 (original)
+++ 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/Gossiper.java
 Wed Oct 12 16:17:02 2011
@@ -75,7 +75,7 @@ public class Gossiper implements IFailur
     private static Logger logger = LoggerFactory.getLogger(Gossiper.class);
     public static final Gossiper instance = new Gossiper();
 
-    private long aVeryLongTime;
+    public static final long aVeryLongTime = 259200 * 1000; // 3 days    
     private long FatClientTimeout;
     private Random random = new Random();
     private Comparator<InetAddress> inetcomparator = new 
Comparator<InetAddress>()
@@ -107,6 +107,8 @@ public class Gossiper implements IFailur
      * gossip gets propagated to all nodes */
     private Map<InetAddress, Long> justRemovedEndpoints = new 
ConcurrentHashMap<InetAddress, Long>();
     
+    private Map<InetAddress, Long> expireTimeEndpointMap = new 
ConcurrentHashMap<InetAddress, Long>();
+    
     // protocol versions of the other nodes in the cluster
     private final ConcurrentMap<InetAddress, Integer> versions = new 
NonBlockingHashMap<InetAddress, Integer>();
 
@@ -174,8 +176,6 @@ public class Gossiper implements IFailur
 
     private Gossiper()
     {
-        // 3 days
-        aVeryLongTime = 259200 * 1000;
         // half of QUARATINE_DELAY, to ensure justRemovedEndpoints has enough 
leeway to prevent re-gossip
         FatClientTimeout = (long)(QUARANTINE_DELAY / 2);
         /* register with the Failure Detector for receiving Failure detector 
events */
@@ -296,6 +296,7 @@ public class Gossiper implements IFailur
     {
         unreachableEndpoints.remove(endpoint);
         endpointStateMap.remove(endpoint);
+        expireTimeEndpointMap.remove(endpoint);
         justRemovedEndpoints.put(endpoint, System.currentTimeMillis());
         if (logger.isDebugEnabled())
             logger.debug("evicting " + endpoint + " from gossip");
@@ -417,7 +418,7 @@ public class Gossiper implements IFailur
         EndpointState epState = endpointStateMap.get(endpoint);
         epState.updateTimestamp(); // make sure we don't evict it too soon
         epState.getHeartBeatState().forceNewerGenerationUnsafe();
-        epState.addApplicationState(ApplicationState.STATUS, 
StorageService.instance.valueFactory.removedNonlocal(token));
+        epState.addApplicationState(ApplicationState.STATUS, 
StorageService.instance.valueFactory.removedNonlocal(token,computeExpireTime()));
         logger.info("Completing removal of " + endpoint);
         endpointStateMap.put(endpoint, epState);
         // ensure at least one gossip round occurs before returning
@@ -572,8 +573,14 @@ public class Gossiper implements IFailur
                     evictFromMembership(endpoint); // can get rid of the state 
immediately
                 }
 
-                if ( !epState.isAlive() && (duration > aVeryLongTime) && 
(!StorageService.instance.getTokenMetadata().isMember(endpoint)))
+                long expireTime = getExpireTimeForEndpoint(endpoint);
+                if (!epState.isAlive() && (now > expireTime)
+                        && 
(!StorageService.instance.getTokenMetadata().isMember(endpoint)))
                 {
+                    if (logger.isDebugEnabled())
+                    {
+                        logger.debug("time is expiring for endpoint : " + 
endpoint + " (" + expireTime + ")");
+                    }
                     evictFromMembership(endpoint);
                 }
             }
@@ -592,6 +599,17 @@ public class Gossiper implements IFailur
             }
         }
     }
+    
+    protected long getExpireTimeForEndpoint(InetAddress endpoint)
+    {
+        /* default expireTime is aVeryLongTime */
+        long expireTime = computeExpireTime();
+        if (expireTimeEndpointMap.containsKey(endpoint))
+        {
+            expireTime = expireTimeEndpointMap.get(endpoint);
+        }
+        return expireTime;
+    }
 
     public EndpointState getEndpointStateForEndpoint(InetAddress ep)
     {
@@ -719,6 +737,8 @@ public class Gossiper implements IFailur
         localState.updateTimestamp(); // prevents doStatusCheck from racing us 
and evicting if it was down > aVeryLongTime
         liveEndpoints.add(addr);
         unreachableEndpoints.remove(addr);
+        expireTimeEndpointMap.remove(addr);
+        logger.debug("removing expire time for endpoint : " + addr);
         logger.info("InetAddress {} is now UP", addr);
         for (IEndpointStateChangeSubscriber subscriber : subscribers)
             subscriber.onAlive(addr, localState);
@@ -1066,5 +1086,18 @@ public class Gossiper implements IFailur
     {
         return getCurrentGenerationNumber(InetAddress.getByName(address));
     }
+    
+    public void addExpireTimeForEndpoint(InetAddress endpoint, long expireTime)
+    {
+        if (logger.isDebugEnabled())
+        {
+            logger.debug("adding expire time for endpoint : " + endpoint + " 
(" + expireTime + ")");
+        }
+        expireTimeEndpointMap.put(endpoint, expireTime);
+    }
+    
+    public static long computeExpireTime() {
+        return System.currentTimeMillis() + Gossiper.aVeryLongTime;
+    }
 
 }

Modified: 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/VersionedValue.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1182457&r1=1182456&r2=1182457&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/VersionedValue.java
 (original)
+++ 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/gms/VersionedValue.java
 Wed Oct 12 16:17:02 2011
@@ -125,9 +125,10 @@ public class VersionedValue implements C
             return new VersionedValue(VersionedValue.STATUS_LEAVING + 
VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
         }
 
-        public VersionedValue left(Token token)
+        public VersionedValue left(Token token, long expireTime)
         {
-            return new VersionedValue(VersionedValue.STATUS_LEFT + 
VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+            return new VersionedValue(VersionedValue.STATUS_LEFT + 
VersionedValue.DELIMITER
+                    + partitioner.getTokenFactory().toString(token) + 
VersionedValue.DELIMITER + expireTime);
         }
 
         public VersionedValue moving(Token token)
@@ -140,9 +141,10 @@ public class VersionedValue implements C
             return new VersionedValue(VersionedValue.REMOVING_TOKEN + 
VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
         }
 
-        public VersionedValue removedNonlocal(Token token)
+        public VersionedValue removedNonlocal(Token token, long expireTime)
         {
-            return new VersionedValue(VersionedValue.REMOVED_TOKEN + 
VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+                       return new VersionedValue(VersionedValue.REMOVED_TOKEN 
+ VersionedValue.DELIMITER
+                                       + 
partitioner.getTokenFactory().toString(token) + VersionedValue.DELIMITER + 
expireTime);
         }
 
         public VersionedValue removalCoordinator(Token token)

Modified: 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java?rev=1182457&r1=1182456&r2=1182457&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
 Wed Oct 12 16:17:02 2011
@@ -969,7 +969,7 @@ public class StorageService implements I
         if (logger_.isDebugEnabled())
             logger_.debug("Node " + endpoint + " state left, token " + token);
 
-        excise(token, endpoint);
+        excise(token, endpoint, extractExpireTime(pieces));
     }
 
     /**
@@ -1021,7 +1021,7 @@ public class StorageService implements I
 
             if (VersionedValue.REMOVED_TOKEN.equals(state))
             {
-                excise(removeToken, endpoint);
+                excise(removeToken, endpoint, extractExpireTime(pieces));
             }
             else if (VersionedValue.REMOVING_TOKEN.equals(state))
             {
@@ -1054,6 +1054,30 @@ public class StorageService implements I
             SystemTable.removeToken(token);
         }
     }
+    
+    private void excise(Token token, InetAddress endpoint, long expireTime)
+    {
+        addExpireTimeIfFound(endpoint, expireTime);
+        excise(token, endpoint);
+    }
+    
+    protected void addExpireTimeIfFound(InetAddress endpoint, long expireTime)
+    {
+        if (expireTime != 0L)
+        {
+            Gossiper.instance.addExpireTimeForEndpoint(endpoint, expireTime);
+        }
+    }
+
+    protected long extractExpireTime(String[] pieces)
+    {
+        long expireTime = 0L;
+        if (pieces.length >= 3)
+        {
+            expireTime = Long.parseLong(pieces[2]);
+        }
+        return expireTime;
+    }
 
     /**
      * Calculate pending ranges according to bootsrapping and leaving nodes. 
Reasoning is:
@@ -1978,7 +2002,7 @@ public class StorageService implements I
         tokenMetadata_.removeEndpoint(FBUtilities.getBroadcastAddress());
         calculatePendingRanges();
 
-        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, 
valueFactory.left(getLocalToken()));
+        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, 
valueFactory.left(getLocalToken(),Gossiper.computeExpireTime()));
         logger_.info("Announcing that I have left the ring for " + RING_DELAY 
+ "ms");
         try
         {

Modified: 
cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java?rev=1182457&r1=1182456&r2=1182457&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
 (original)
+++ 
cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
 Wed Oct 12 16:17:02 2011
@@ -35,6 +35,7 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.SimpleSnitch;
@@ -299,8 +300,10 @@ public class LeaveAndBootstrapTest exten
 
         // Now finish node 6 and node 9 leaving, as well as boot1 (after this 
node 8 is still
         // leaving and boot2 in progress
-        ss.onChange(hosts.get(LEAVING[0]), ApplicationState.STATUS, 
valueFactory.left(endpointTokens.get(LEAVING[0])));
-        ss.onChange(hosts.get(LEAVING[2]), ApplicationState.STATUS, 
valueFactory.left(endpointTokens.get(LEAVING[2])));
+        ss.onChange(hosts.get(LEAVING[0]), ApplicationState.STATUS,
+                valueFactory.left(endpointTokens.get(LEAVING[0]), 
Gossiper.computeExpireTime()));
+        ss.onChange(hosts.get(LEAVING[2]), ApplicationState.STATUS,
+                valueFactory.left(endpointTokens.get(LEAVING[2]), 
Gossiper.computeExpireTime()));
         ss.onChange(boot1, ApplicationState.STATUS, 
valueFactory.normal(keyTokens.get(5)));
 
         // adjust precalcuated results.  this changes what the epected 
endpoints are.
@@ -506,7 +509,8 @@ public class LeaveAndBootstrapTest exten
 
         // node 3 goes through leave and left and then jumps to normal at its 
new token
         ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.leaving(keyTokens.get(2)));
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.left(keyTokens.get(2)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS,
+                valueFactory.left(keyTokens.get(2), 
Gossiper.computeExpireTime()));
         ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.normal(keyTokens.get(4)));
 
         assertTrue(tmd.getBootstrapTokens().isEmpty());
@@ -556,7 +560,8 @@ public class LeaveAndBootstrapTest exten
         assertTrue(tmd.getBootstrapTokens().isEmpty());
 
         // go to state left
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.left(keyTokens.get(1)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS,
+                valueFactory.left(keyTokens.get(1), 
Gossiper.computeExpireTime()));
 
         assertFalse(tmd.isMember(hosts.get(2)));
         assertFalse(tmd.isLeaving(hosts.get(2)));
@@ -583,7 +588,8 @@ public class LeaveAndBootstrapTest exten
         Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, 7);
 
         // node hosts.get(2) goes jumps to left
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.left(endpointTokens.get(2)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS,
+                valueFactory.left(endpointTokens.get(2), 
Gossiper.computeExpireTime()));
 
         assertFalse(tmd.isMember(hosts.get(2)));
 
@@ -595,7 +601,8 @@ public class LeaveAndBootstrapTest exten
         
assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3)));
 
         // and then directly to 'left'
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.left(keyTokens.get(1)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS,
+                valueFactory.left(keyTokens.get(1), 
Gossiper.computeExpireTime()));
 
         assertTrue(tmd.getBootstrapTokens().size() == 0);
         assertFalse(tmd.isMember(hosts.get(2)));


Reply via email to