Author: brandonwilliams
Date: Thu Feb  3 17:58:46 2011
New Revision: 1066899

URL: http://svn.apache.org/viewvc?rev=1066899&view=rev
Log:
Check for schema agreement before delivering hints, add random sleep to
avoid DDoSing the node.
Patch by brandonwilliams and jbellis, reviewed by brandonwilliams for
CASSANDRA-2083

Modified:
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1066899&r1=1066898&r2=1066899&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 Thu Feb  3 17:58:46 2011
@@ -28,6 +28,8 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.TimeoutException;
 
 import static com.google.common.base.Charsets.UTF_8;
+
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +44,7 @@ import org.apache.cassandra.dht.IPartiti
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.*;
@@ -233,10 +236,33 @@ public class HintedHandOffManager implem
                                 ByteBufferUtil.string(joined, index + 1, 
joined.limit() - (index + 1))
                             };
     }
+
+    private int waitForSchemaAgreement(InetAddress endpoint) throws 
InterruptedException
+    {
+        Gossiper gossiper = Gossiper.instance;
+        int waited = 0;
+        while 
(!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
+                
gossiper.getEndpointStateForEndpoint(FBUtilities.getLocalAddress()).getApplicationState(ApplicationState.SCHEMA).value))
+        {
+            Thread.sleep(1000);
+            waited += 1000;
+            if (waited > 2 * StorageService.RING_DELAY)
+                throw new RuntimeException("Could not reach schema agreement 
with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
+        }
+        logger_.debug("schema for {} matches local schema", endpoint);
+        return waited;
+    }
             
-    private void deliverHintsToEndpoint(InetAddress endpoint) throws 
IOException, DigestMismatchException, InvalidRequestException, TimeoutException
+    private void deliverHintsToEndpoint(InetAddress endpoint) throws 
IOException, DigestMismatchException, InvalidRequestException, 
TimeoutException, InterruptedException
     {
+        logger_.info("Checking remote schema before delivering hints to " + 
endpoint);
+        int waited = waitForSchemaAgreement(endpoint);
         logger_.info("Started hinted handoff for endpoint " + endpoint);
+        // sleep a random amount to stagger handoff delivery from different 
replicas.
+        // (if we had to wait, then gossiper randomness took care of that for 
us already.)
+        if (waited == 0)
+            Thread.sleep(new Random().nextInt(60000));
+
         queuedDeliveries.remove(endpoint);
 
         // 1. Get the key of the endpoint we need to handoff

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1066899&r1=1066898&r2=1066899&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
 Thu Feb  3 17:58:46 2011
@@ -38,6 +38,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.io.SerDeUtils;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.service.MigrationManager;
@@ -134,6 +135,7 @@ public abstract class Migration
             migration = new RowMutation(Table.SYSTEM_TABLE, 
LAST_MIGRATION_KEY);
             migration.add(new QueryPath(SCHEMA_CF, null, LAST_MIGRATION_KEY), 
ByteBuffer.wrap(UUIDGen.decompose(newVersion)), now);
             migration.apply();
+            
Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, 
StorageService.instance.valueFactory.migration(newVersion));
 
             // if we fail here, there will be schema changes in the CL that 
will get replayed *AFTER* the schema is loaded.
             // CassandraDaemon checks for this condition (the stored version 
will be greater than the loaded version)

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1066899&r1=1066898&r2=1066899&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
 Thu Feb  3 17:58:46 2011
@@ -874,6 +874,8 @@ public class Gossiper implements IFailur
             localState = new EndpointState(hbState);
             localState.isAlive(true);
             localState.isAGossiper(true);
+            if (!StorageService.instance.isClientMode())
+                localState.addApplicationState(ApplicationState.SCHEMA, 
StorageService.instance.valueFactory.migration(DatabaseDescriptor.getDefsVersion()));
             endpointStateMap_.put(localEndpoint_, localState);
         }
 


Reply via email to