Author: brandonwilliams Date: Thu Feb 3 17:58:46 2011 New Revision: 1066899
URL: http://svn.apache.org/viewvc?rev=1066899&view=rev Log: Check for schema agreement before delivering hints, add random sleep to avoid DDoSing the node. Patch by brandonwilliams and jbellis, reviewed by brandonwilliams for CASSANDRA-2083 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1066899&r1=1066898&r2=1066899&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java Thu Feb 3 17:58:46 2011 @@ -28,6 +28,8 @@ import java.util.concurrent.ExecutorServ import java.util.concurrent.TimeoutException; import static com.google.common.base.Charsets.UTF_8; + +import org.apache.cassandra.utils.FBUtilities; import org.apache.commons.lang.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +44,7 @@ import org.apache.cassandra.dht.IPartiti import org.apache.cassandra.dht.Range; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.*; @@ -233,10 +236,33 @@ public class HintedHandOffManager implem ByteBufferUtil.string(joined, index + 1, joined.limit() - (index + 1)) }; } + + private int waitForSchemaAgreement(InetAddress endpoint) throws InterruptedException + { + Gossiper gossiper = Gossiper.instance; + int waited = 0; + while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals( + gossiper.getEndpointStateForEndpoint(FBUtilities.getLocalAddress()).getApplicationState(ApplicationState.SCHEMA).value)) + { + Thread.sleep(1000); + waited += 1000; + if (waited > 2 * StorageService.RING_DELAY) + throw new RuntimeException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms"); + } + logger_.debug("schema for {} matches local schema", endpoint); + return waited; + } - private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException + private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException, InterruptedException { + logger_.info("Checking remote schema before delivering hints to " + endpoint); + int waited = waitForSchemaAgreement(endpoint); logger_.info("Started hinted handoff for endpoint " + endpoint); + // sleep a random amount to stagger handoff delivery from different replicas. + // (if we had to wait, then gossiper randomness took care of that for us already.) + if (waited == 0) + Thread.sleep(new Random().nextInt(60000)); + queuedDeliveries.remove(endpoint); // 1. Get the key of the endpoint we need to handoff Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1066899&r1=1066898&r2=1066899&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java Thu Feb 3 17:58:46 2011 @@ -38,6 +38,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.io.SerDeUtils; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.service.MigrationManager; @@ -134,6 +135,7 @@ public abstract class Migration migration = new RowMutation(Table.SYSTEM_TABLE, LAST_MIGRATION_KEY); migration.add(new QueryPath(SCHEMA_CF, null, LAST_MIGRATION_KEY), ByteBuffer.wrap(UUIDGen.decompose(newVersion)), now); migration.apply(); + Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(newVersion)); // if we fail here, there will be schema changes in the CL that will get replayed *AFTER* the schema is loaded. // CassandraDaemon checks for this condition (the stored version will be greater than the loaded version) Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1066899&r1=1066898&r2=1066899&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java Thu Feb 3 17:58:46 2011 @@ -874,6 +874,8 @@ public class Gossiper implements IFailur localState = new EndpointState(hbState); localState.isAlive(true); localState.isAGossiper(true); + if (!StorageService.instance.isClientMode()) + localState.addApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(DatabaseDescriptor.getDefsVersion())); endpointStateMap_.put(localEndpoint_, localState); }
