Author: brandonwilliams Date: Thu Feb 3 23:33:51 2011 New Revision: 1067032
URL: http://svn.apache.org/viewvc?rev=1067032&view=rev Log: Fix race between HH and schema changes. Patch by brandonwilliams, reviewed by gdusbabek for CASSANDRA-2083 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java?rev=1067032&r1=1067031&r2=1067032&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java Thu Feb 3 23:33:51 2011 @@ -72,6 +72,8 @@ public class DefinitionsUpdateResponseVe try { m.apply(); + // update gossip, but don't contact nodes directly + m.passiveAnnounce(); } catch (ConfigurationException ex) { Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1067032&r1=1067031&r2=1067032&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java Thu Feb 3 23:33:51 2011 @@ -28,6 +28,8 @@ import java.util.concurrent.ExecutorServ import java.util.concurrent.TimeoutException; import static com.google.common.base.Charsets.UTF_8; + +import org.apache.cassandra.utils.FBUtilities; import org.apache.commons.lang.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +44,7 @@ import org.apache.cassandra.dht.IPartiti import org.apache.cassandra.dht.Range; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.*; @@ -233,10 +236,41 @@ public class HintedHandOffManager implem ByteBufferUtil.string(joined, index + 1, joined.limit() - (index + 1)) }; } + + private int waitForSchemaAgreement(InetAddress endpoint) throws InterruptedException + { + Gossiper gossiper = Gossiper.instance; + int waited = 0; + while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals( + gossiper.getEndpointStateForEndpoint(FBUtilities.getLocalAddress()).getApplicationState(ApplicationState.SCHEMA).value)) + { + Thread.sleep(1000); + waited += 1000; + if (waited > 2 * StorageService.RING_DELAY) + throw new RuntimeException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms"); + } + logger_.debug("schema for {} matches local schema", endpoint); + return waited; + } - private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException + private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException, InterruptedException { + logger_.info("Checking remote schema before delivering hints"); + int waited = waitForSchemaAgreement(endpoint); + // sleep a random amount to stagger handoff delivery from different replicas. + // (if we had to wait, then gossiper randomness took care of that for us already.) + if (waited == 0) { + int sleep = new Random().nextInt(60000); + logger_.info("Sleeping {}ms to stagger hint delivery", sleep); + Thread.sleep(sleep); + } + if (!Gossiper.instance.getEndpointStateForEndpoint(endpoint).isAlive()) + { + logger_.info("Endpoint {} died before hint delivery, aborting", endpoint); + return; + } logger_.info("Started hinted handoff for endpoint " + endpoint); + queuedDeliveries.remove(endpoint); // 1. Get the key of the endpoint we need to handoff Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1067032&r1=1067031&r2=1067032&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java Thu Feb 3 23:33:51 2011 @@ -175,10 +175,15 @@ public abstract class Migration if (StorageService.instance.isClientMode()) return; - // immediate notification for esiting nodes. + // immediate notification for existing nodes. MigrationManager.announce(newVersion, Gossiper.instance.getLiveMembers()); } - + + public final void passiveAnnounce() + { + MigrationManager.passiveAnnounce(newVersion); + } + public static UUID getLastMigrationId() { DecoratedKey dkey = StorageService.getPartitioner().decorateKey(LAST_MIGRATION_KEY); Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1067032&r1=1067031&r2=1067032&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java Thu Feb 3 23:33:51 2011 @@ -89,15 +89,21 @@ public class MigrationManager implements } } - /** announce my version to a set of hosts. They may culminate with them sending me migrations. */ + /** actively announce my version to a set of hosts via rpc. They may culminate with them sending me migrations. */ public static void announce(UUID version, Set<InetAddress> hosts) { Message msg = makeVersionMessage(version); for (InetAddress host : hosts) MessagingService.instance().sendOneWay(msg, host); - // this is for notifying nodes as they arrive in the cluster. + passiveAnnounce(version); + } + + /** announce my version passively over gossip **/ + public static void passiveAnnounce(UUID version) + { if (!StorageService.instance.isClientMode()) Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(version)); + logger.debug("Announcing my schema is " + version); } /** @@ -152,6 +158,7 @@ public class MigrationManager implements throw new IOException(e); } } + passiveAnnounce(to); // we don't need to send rpcs, but we need to update gossip } /** pushes migrations from this host to another host */
