Updated Branches: refs/heads/cassandra-1.2 88bc4f237 -> 65daaf9d6
Future-proof inter-major-version schema migrations patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for CASSANDRA-5845 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65daaf9d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65daaf9d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65daaf9d Branch: refs/heads/cassandra-1.2 Commit: 65daaf9d6ae90c1cd01cdbe23e36998411f0f2ed Parents: 88bc4f2 Author: Aleksey Yeschenko <[email protected]> Authored: Sun Aug 4 00:58:28 2013 +0200 Committer: Aleksey Yeschenko <[email protected]> Committed: Sun Aug 4 00:58:28 2013 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/MigrationManager.java | 50 ++++++++++---------- 2 files changed, 25 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/65daaf9d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8eff1fd..960a8c2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ * (Hadoop) fix support for Thrift tables in CqlPagingRecordReader (CASSANDRA-5752) * add "all time blocked" to StatusLogger output (CASSANDRA-5825) + * Future-proof inter-major-version schema migrations (CASSANDRA-5845) 1.2.8 http://git-wip-us.apache.org/repos/asf/cassandra/blob/65daaf9d/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index de34785..9aa6f22 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -110,14 +110,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber */ private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint) { - // Can't request migrations from nodes with versions younger than 1.1.7 - if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_117) - return; - - if (Gossiper.instance.isFatClient(endpoint)) - return; - - if (Schema.instance.getVersion().equals(theirVersion)) + if (Schema.instance.getVersion().equals(theirVersion) || !shouldPullSchemaFrom(endpoint)) return; if (Schema.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS) @@ -146,13 +139,25 @@ public class MigrationManager implements IEndpointStateChangeSubscriber } } - private static void submitMigrationTask(InetAddress endpoint) + private static Future<?> submitMigrationTask(InetAddress endpoint) { /* * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are * running in the gossip stage. */ - StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); + return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); + } + + private static boolean shouldPullSchemaFrom(InetAddress endpoint) + { + /* + * Don't request schema from nodes with versions younger than 1.1.7 (timestamps in versions prior to 1.1.7 are broken) + * Don't request schema from nodes with a higher major (may have incompatible schema) + * Don't request schema from fat clients + */ + return MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_117 + && MessagingService.instance().getVersion(endpoint) <= MessagingService.current_version + && !Gossiper.instance.isFatClient(endpoint); } public static boolean isReadyForBootstrap() @@ -303,10 +308,10 @@ public class MigrationManager implements IEndpointStateChangeSubscriber for (InetAddress endpoint : Gossiper.instance.getLiveMembers()) { if (endpoint.equals(FBUtilities.getBroadcastAddress())) - continue; // we've delt with localhost already + continue; // we've dealt with localhost already - // don't send migrations to the nodes with the versions older than < 1.2 - if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_12) + // don't send schema to the nodes with the versions older than current major + if (MessagingService.instance().getVersion(endpoint) < MessagingService.current_version) continue; pushSchemaMutation(endpoint, schema); @@ -338,8 +343,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber try { - if (logger.isDebugEnabled()) - logger.debug("Truncating schema tables..."); + logger.debug("Truncating schema tables..."); // truncate schema tables FBUtilities.waitOnFutures(new ArrayList<Future<?>>(3) @@ -349,26 +353,20 @@ public class MigrationManager implements IEndpointStateChangeSubscriber SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNS_CF).truncate(); }}); - if (logger.isDebugEnabled()) - logger.debug("Clearing local schema keyspace definitions..."); + logger.debug("Clearing local schema keyspace definitions..."); Schema.instance.clear(); Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers(); liveEndpoints.remove(FBUtilities.getBroadcastAddress()); - // force migration is there are nodes around, first of all - // check if there are nodes with versions >= 1.1.7 to request migrations from, - // because migration format of the nodes with versions < 1.1 is incompatible with older versions - // and due to broken timestamps in versions prior to 1.1.7 + // force migration if there are nodes around for (InetAddress node : liveEndpoints) { - if (MessagingService.instance().getVersion(node) >= MessagingService.VERSION_117) + if (shouldPullSchemaFrom(node)) { - if (logger.isDebugEnabled()) - logger.debug("Requesting schema from " + node); - - FBUtilities.waitOnFuture(StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(node))); + logger.debug("Requesting schema from {}", node); + FBUtilities.waitOnFuture(submitMigrationTask(node)); break; } }
