Author: jbellis
Date: Thu May 26 20:47:16 2011
New Revision: 1128074
URL: http://svn.apache.org/viewvc?rev=1128074&view=rev
Log:
throttle migration replay
patch by jbellis; reviewed by gdusbabek for CASSANDRA-2714
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1128074&r1=1128073&r2=1128074&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu May 26 20:47:16 2011
@@ -10,6 +10,7 @@
* remove no-op HHOM.renameHints (CASSANDRA-2693)
* clone super columns to avoid modifying them during flush (CASSANDRA-2675)
* close scrub file handles (CASSANDRA-2669)
+ * throttle migration replay (CASSANDRA-2714)
0.7.6
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1128074&r1=1128073&r2=1128074&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/migration/Migration.java
Thu May 26 20:47:16 2011
@@ -298,7 +298,12 @@ public abstract class Migration
DecoratedKey dkey =
StorageService.getPartitioner().decorateKey(MIGRATIONS_KEY);
Table defs = Table.open(Table.SYSTEM_TABLE);
ColumnFamilyStore cfStore =
defs.getColumnFamilyStore(Migration.MIGRATIONS_CF);
- QueryFilter filter = QueryFilter.getSliceFilter(dkey, new
QueryPath(MIGRATIONS_CF), ByteBuffer.wrap(UUIDGen.decompose(start)),
ByteBuffer.wrap(UUIDGen.decompose(end)), false, 1000);
+ QueryFilter filter = QueryFilter.getSliceFilter(dkey,
+ new
QueryPath(MIGRATIONS_CF),
+
ByteBuffer.wrap(UUIDGen.decompose(start)),
+
ByteBuffer.wrap(UUIDGen.decompose(end)),
+ false,
+ 100);
ColumnFamily cf = cfStore.getColumnFamily(filter);
return cf.getSortedColumns();
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1128074&r1=1128073&r2=1128074&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/MigrationManager.java
Thu May 26 20:47:16 2011
@@ -24,8 +24,10 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.MapMaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,16 +37,21 @@ import org.apache.cassandra.config.Confi
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.db.migration.Migration;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
public class MigrationManager implements IEndpointStateChangeSubscriber
{
private static final Logger logger =
LoggerFactory.getLogger(MigrationManager.class);
-
+
+ // avoids re-pushing migrations that we're waiting on target to apply
already
+ private static Map<InetAddress,UUID> lastPushed = new
MapMaker().expiration(1, TimeUnit.MINUTES).makeMap();
+
/** I'm not going to act here. */
public void onJoin(InetAddress endpoint, EndpointState epState) { }
@@ -87,8 +94,16 @@ public class MigrationManager implements
}
else if (!StorageService.instance.isClientMode())
{
- logger.debug("Their data definitions are old. Sending updates
since {}", theirVersion.toString());
- pushMigrations(theirVersion, myVersion, endpoint);
+ if (lastPushed.get(endpoint) == null || theirVersion.timestamp()
>= lastPushed.get(endpoint).timestamp())
+ {
+ logger.debug("Schema on {} is old. Sending updates since {}",
endpoint, theirVersion);
+ pushMigrations(theirVersion, myVersion, endpoint);
+ }
+ else
+ {
+ logger.debug("Waiting for {} to process migrations up to {}
before sending more",
+ endpoint, lastPushed.get(endpoint));
+ }
}
}
@@ -172,6 +187,7 @@ public class MigrationManager implements
{
Message msg = makeMigrationMessage(migrations);
MessagingService.instance().sendOneWay(msg, host);
+ lastPushed.put(host,
TimeUUIDType.instance.compose(Iterables.getLast(migrations).name()));
}
catch (IOException ex)
{