Author: jbellis Date: Thu May 26 20:47:16 2011 New Revision: 1128074 URL: http://svn.apache.org/viewvc?rev=1128074&view=rev Log: throttle migration replay patch by jbellis; reviewed by gdusbabek for CASSANDRA-2714
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1128074&r1=1128073&r2=1128074&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu May 26 20:47:16 2011 @@ -10,6 +10,7 @@ * remove no-op HHOM.renameHints (CASSANDRA-2693) * clone super columns to avoid modifying them during flush (CASSANDRA-2675) * close scrub file handles (CASSANDRA-2669) + * throttle migration replay (CASSANDRA-2714) 0.7.6 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1128074&r1=1128073&r2=1128074&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java Thu May 26 20:47:16 2011 @@ -298,7 +298,12 @@ public abstract class Migration DecoratedKey dkey = StorageService.getPartitioner().decorateKey(MIGRATIONS_KEY); Table defs = Table.open(Table.SYSTEM_TABLE); ColumnFamilyStore cfStore = defs.getColumnFamilyStore(Migration.MIGRATIONS_CF); - QueryFilter filter = QueryFilter.getSliceFilter(dkey, new QueryPath(MIGRATIONS_CF), ByteBuffer.wrap(UUIDGen.decompose(start)), ByteBuffer.wrap(UUIDGen.decompose(end)), false, 1000); + QueryFilter filter = QueryFilter.getSliceFilter(dkey, + new QueryPath(MIGRATIONS_CF), + ByteBuffer.wrap(UUIDGen.decompose(start)), + ByteBuffer.wrap(UUIDGen.decompose(end)), + false, + 100); ColumnFamily cf = cfStore.getColumnFamily(filter); return cf.getSortedColumns(); } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1128074&r1=1128073&r2=1128074&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java Thu May 26 20:47:16 2011 @@ -24,8 +24,10 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; -import org.apache.cassandra.utils.ByteBufferUtil; +import com.google.common.collect.Iterables; +import com.google.common.collect.MapMaker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,16 +37,21 @@ import org.apache.cassandra.config.Confi import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Column; import org.apache.cassandra.db.IColumn; +import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.db.migration.Migration; import org.apache.cassandra.gms.*; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; public class MigrationManager implements IEndpointStateChangeSubscriber { private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class); - + + // avoids re-pushing migrations that we're waiting on target to apply already + private static Map<InetAddress,UUID> lastPushed = new MapMaker().expiration(1, TimeUnit.MINUTES).makeMap(); + /** I'm not going to act here. */ public void onJoin(InetAddress endpoint, EndpointState epState) { } @@ -87,8 +94,16 @@ public class MigrationManager implements } else if (!StorageService.instance.isClientMode()) { - logger.debug("Their data definitions are old. Sending updates since {}", theirVersion.toString()); - pushMigrations(theirVersion, myVersion, endpoint); + if (lastPushed.get(endpoint) == null || theirVersion.timestamp() >= lastPushed.get(endpoint).timestamp()) + { + logger.debug("Schema on {} is old. Sending updates since {}", endpoint, theirVersion); + pushMigrations(theirVersion, myVersion, endpoint); + } + else + { + logger.debug("Waiting for {} to process migrations up to {} before sending more", + endpoint, lastPushed.get(endpoint)); + } } } @@ -172,6 +187,7 @@ public class MigrationManager implements { Message msg = makeMigrationMessage(migrations); MessagingService.instance().sendOneWay(msg, host); + lastPushed.put(host, TimeUUIDType.instance.compose(Iterables.getLast(migrations).name())); } catch (IOException ex) {