Author: brandonwilliams
Date: Thu Feb  3 23:33:51 2011
New Revision: 1067032

URL: http://svn.apache.org/viewvc?rev=1067032&view=rev
Log:
Fix race between HH and schema changes.
Patch by brandonwilliams, reviewed by gdusbabek for CASSANDRA-2083

Modified:
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java?rev=1067032&r1=1067031&r2=1067032&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
 Thu Feb  3 23:33:51 2011
@@ -72,6 +72,8 @@ public class DefinitionsUpdateResponseVe
                                 try
                                 {
                                     m.apply();
+                                    // update gossip, but don't contact nodes 
directly
+                                    m.passiveAnnounce();
                                 }
                                 catch (ConfigurationException ex)
                                 {

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1067032&r1=1067031&r2=1067032&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 Thu Feb  3 23:33:51 2011
@@ -28,6 +28,8 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.TimeoutException;
 
 import static com.google.common.base.Charsets.UTF_8;
+
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +44,7 @@ import org.apache.cassandra.dht.IPartiti
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.*;
@@ -233,10 +236,41 @@ public class HintedHandOffManager implem
                                 ByteBufferUtil.string(joined, index + 1, 
joined.limit() - (index + 1))
                             };
     }
+
+    private int waitForSchemaAgreement(InetAddress endpoint) throws 
InterruptedException
+    {
+        Gossiper gossiper = Gossiper.instance;
+        int waited = 0;
+        while 
(!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
+                
gossiper.getEndpointStateForEndpoint(FBUtilities.getLocalAddress()).getApplicationState(ApplicationState.SCHEMA).value))
+        {
+            Thread.sleep(1000);
+            waited += 1000;
+            if (waited > 2 * StorageService.RING_DELAY)
+                throw new RuntimeException("Could not reach schema agreement 
with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
+        }
+        logger_.debug("schema for {} matches local schema", endpoint);
+        return waited;
+    }
             
-    private void deliverHintsToEndpoint(InetAddress endpoint) throws 
IOException, DigestMismatchException, InvalidRequestException, TimeoutException
+    private void deliverHintsToEndpoint(InetAddress endpoint) throws 
IOException, DigestMismatchException, InvalidRequestException, 
TimeoutException, InterruptedException
     {
+        logger_.info("Checking remote schema before delivering hints");
+        int waited = waitForSchemaAgreement(endpoint);
+        // sleep a random amount to stagger handoff delivery from different 
replicas.
+        // (if we had to wait, then gossiper randomness took care of that for 
us already.)
+        if (waited == 0) {
+            int sleep = new Random().nextInt(60000);
+            logger_.info("Sleeping {}ms to stagger hint delivery", sleep);
+            Thread.sleep(sleep);
+        }
+        if (!Gossiper.instance.getEndpointStateForEndpoint(endpoint).isAlive())
+        {
+            logger_.info("Endpoint {} died before hint delivery, aborting", 
endpoint);
+            return;
+        }
         logger_.info("Started hinted handoff for endpoint " + endpoint);
+
         queuedDeliveries.remove(endpoint);
 
         // 1. Get the key of the endpoint we need to handoff

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1067032&r1=1067031&r2=1067032&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
 Thu Feb  3 23:33:51 2011
@@ -175,10 +175,15 @@ public abstract class Migration
         if (StorageService.instance.isClientMode())
             return;
         
-        // immediate notification for esiting nodes.
+        // immediate notification for existing nodes.
         MigrationManager.announce(newVersion, 
Gossiper.instance.getLiveMembers());
     }
-    
+
+    public final void passiveAnnounce()
+    {
+        MigrationManager.passiveAnnounce(newVersion);
+    }
+
     public static UUID getLastMigrationId()
     {
         DecoratedKey dkey = 
StorageService.getPartitioner().decorateKey(LAST_MIGRATION_KEY);

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1067032&r1=1067031&r2=1067032&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
 Thu Feb  3 23:33:51 2011
@@ -89,15 +89,21 @@ public class MigrationManager implements
         }
     }
 
-    /** announce my version to a set of hosts.  They may culminate with them 
sending me migrations. */
+    /** actively announce my version to a set of hosts via rpc.  They may 
culminate with them sending me migrations. */
     public static void announce(UUID version, Set<InetAddress> hosts)
     {
         Message msg = makeVersionMessage(version);
         for (InetAddress host : hosts)
             MessagingService.instance().sendOneWay(msg, host);
-        // this is for notifying nodes as they arrive in the cluster.
+        passiveAnnounce(version);
+    }
+
+    /** announce my version passively over gossip **/
+    public static void passiveAnnounce(UUID version)
+    {
         if (!StorageService.instance.isClientMode())
             
Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, 
StorageService.instance.valueFactory.migration(version));
+        logger.debug("Announcing my schema is " + version);
     }
 
     /**
@@ -152,6 +158,7 @@ public class MigrationManager implements
                 throw new IOException(e);
             }
         }
+        passiveAnnounce(to); // we don't need to send rpcs, but we need to 
update gossip
     }
     
     /** pushes migrations from this host to another host */


Reply via email to