Updated Branches: refs/heads/cassandra-1.1 5030f78a5 -> e376bc02e
add a convenient way to reset a node's schema patch by Yuki Morishita and Pavel Yaskevich; reviewed by Pavel Yaskevich for CASSANDRA-2963 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e376bc02 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e376bc02 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e376bc02 Branch: refs/heads/cassandra-1.1 Commit: e376bc02eca6f75489d2aff41e0cc36105eb8567 Parents: 5030f78 Author: Pavel Yaskevich <[email protected]> Authored: Wed Feb 22 21:54:43 2012 +0300 Committer: Pavel Yaskevich <[email protected]> Committed: Wed Feb 22 21:59:31 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/config/Schema.java | 18 ++ src/java/org/apache/cassandra/db/Table.java | 25 +++- .../apache/cassandra/service/MigrationManager.java | 144 +++++++++++---- .../apache/cassandra/service/StorageService.java | 7 +- .../cassandra/service/StorageServiceMBean.java | 4 +- src/java/org/apache/cassandra/tools/NodeCmd.java | 3 + src/java/org/apache/cassandra/tools/NodeProbe.java | 6 +- 8 files changed, 162 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7811b1f..a41bdba 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ * Fix BulkRecordWriter to not throw NPE if reducer gets no map data from Hadoop (CASSANDRA-3944) * Fix bug with counters in super columns (CASSANDRA-3821) * Remove deprecated merge_shard_chance (CASSANDRA-3940) + * add a convenient way to reset a node's schema (CASSANDRA-2963) Merged from 1.0: * remove the wait on hint future during write (CASSANDRA-3870) * (cqlsh) ignore missing CfDef opts (CASSANDRA-3933) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/src/java/org/apache/cassandra/config/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java index 89c5d1e..d27b347 100644 --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@ -479,6 +479,7 @@ public class Schema versionLock.writeLock().unlock(); } } + /* * Like updateVersion, but also announces via gossip */ @@ -487,4 +488,21 @@ public class Schema updateVersion(); MigrationManager.passiveAnnounce(version); } + + /** + * Clear all KS/CF metadata and reset version. + */ + public synchronized void clear() + { + for (String table : getNonSystemTables()) + { + KSMetaData ksm = getTableDefinition(table); + for (CFMetaData cfm : ksm.cfMetaData().values()) + purge(cfm); + clearTableDefinition(ksm); + } + + updateVersionAndAnnounce(); + fixCFMaxId(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/src/java/org/apache/cassandra/db/Table.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java index afe9a65..22285be 100644 --- a/src/java/org/apache/cassandra/db/Table.java +++ b/src/java/org/apache/cassandra/db/Table.java @@ -47,6 +47,7 @@ import org.apache.cassandra.io.util.MmappedSegmentedFile; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.NodeId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -334,9 +335,27 @@ public class Table /** adds a cf to internal structures, ends up creating disk files). */ public void initCf(Integer cfId, String cfName) { - assert !columnFamilyStores.containsKey(cfId) : String.format("tried to init %s as %s, but already used by %s", - cfName, cfId, columnFamilyStores.get(cfId)); - columnFamilyStores.put(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName)); + if (columnFamilyStores.containsKey(cfId)) + { + // this is the case when you reset local schema + // just reload metadata + ColumnFamilyStore cfs = columnFamilyStores.get(cfId); + assert cfs.getColumnFamilyName().equals(cfName); + + try + { + cfs.metadata.reload(); + cfs.reload(); + } + catch (IOException e) + { + throw FBUtilities.unchecked(e); + } + } + else + { + columnFamilyStores.put(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName)); + } } public Row getRow(QueryFilter filter) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index d303d40..b37fc2b 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -23,9 +23,8 @@ import java.io.DataOutputStream; import java.io.IOError; import java.io.IOException; import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -37,8 +36,7 @@ import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.DefsTable; -import org.apache.cassandra.db.RowMutation; +import org.apache.cassandra.db.*; import org.apache.cassandra.gms.*; import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.io.util.FastByteArrayOutputStream; @@ -96,41 +94,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are * running in the gossip stage. */ - - StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable() - { - public void runMayThrow() throws Exception - { - Message message = new Message(FBUtilities.getBroadcastAddress(), - StorageService.Verb.MIGRATION_REQUEST, - ArrayUtils.EMPTY_BYTE_ARRAY, - Gossiper.instance.getVersion(endpoint)); - - int retries = 0; - while (retries < MIGRATION_REQUEST_RETRIES) - { - if (!FailureDetector.instance.isAlive(endpoint)) - { - logger.error("Can't send migration request: node {} is down.", endpoint); - return; - } - - IAsyncResult iar = MessagingService.instance().sendRR(message, endpoint); - - try - { - byte[] reply = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); - - DefsTable.mergeRemoteSchema(reply, message.getVersion()); - return; - } - catch(TimeoutException e) - { - retries++; - } - } - } - }); + StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); } public static boolean isReadyForBootstrap() @@ -244,4 +208,104 @@ public class MigrationManager implements IEndpointStateChangeSubscriber return schema; } + + /** + * Clear all locally stored schema information and reset schema to initial state. + * Called by user (via JMX) who wants to get rid of schema disagreement. + * + * @throws IOException if schema tables truncation fails + */ + public static void resetLocalSchema() throws IOException + { + logger.info("Starting local schema reset..."); + + try + { + if (logger.isDebugEnabled()) + logger.debug("Truncating schema tables..."); + + // truncate schema tables + FBUtilities.waitOnFutures(new ArrayList<Future<?>>() + {{ + SystemTable.schemaCFS(SystemTable.SCHEMA_KEYSPACES_CF).truncate(); + SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNFAMILIES_CF).truncate(); + SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNS_CF).truncate(); + }}); + + if (logger.isDebugEnabled()) + logger.debug("Clearing local schema keyspace definitions..."); + + Schema.instance.clear(); + + Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers(); + liveEndpoints.remove(FBUtilities.getBroadcastAddress()); + + // force migration is there are nodes around, first of all + // check if there are nodes with versions >= 1.1 to request migrations from, + // because migration format of the nodes with versions < 1.1 is incompatible with older versions + for (InetAddress node : liveEndpoints) + { + if (Gossiper.instance.getVersion(node) >= MessagingService.VERSION_11) + { + if (logger.isDebugEnabled()) + logger.debug("Requesting schema from " + node); + + FBUtilities.waitOnFuture(StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(node))); + break; + } + } + + logger.info("Local schema reset is complete."); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + catch (ExecutionException e) + { + throw new RuntimeException(e); + } + } + + static class MigrationTask extends WrappedRunnable + { + private final InetAddress endpoint; + + MigrationTask(InetAddress endpoint) + { + this.endpoint = endpoint; + } + + public void runMayThrow() throws Exception + { + Message message = new Message(FBUtilities.getBroadcastAddress(), + StorageService.Verb.MIGRATION_REQUEST, + ArrayUtils.EMPTY_BYTE_ARRAY, + Gossiper.instance.getVersion(endpoint)); + + int retries = 0; + while (retries < MIGRATION_REQUEST_RETRIES) + { + if (!FailureDetector.instance.isAlive(endpoint)) + { + logger.error("Can't send migration request: node {} is down.", endpoint); + return; + } + + IAsyncResult iar = MessagingService.instance().sendRR(message, endpoint); + + try + { + byte[] reply = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); + + DefsTable.mergeRemoteSchema(reply, message.getVersion()); + return; + } + catch(TimeoutException e) + { + retries++; + } + } + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 7a7bc7c..a0e99ee 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2955,7 +2955,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe { ColumnFamilyStore.loadNewSSTables(ksName, cfName); } - + /** * #{@inheritDoc} */ @@ -2973,4 +2973,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe { ColumnFamilyStore.rebuildSecondaryIndex(ksName, cfName, idxNames); } + + public void resetLocalSchema() throws IOException + { + MigrationManager.resetLocalSchema(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 4f589d7..a3cbc7b 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -374,7 +374,7 @@ public interface StorageServiceMBean * @param cfName The ColumnFamily name where SSTables belong */ public void loadNewSSTables(String ksName, String cfName); - + /** * Return a List of Tokens representing a sample of keys * across all ColumnFamilyStores @@ -387,4 +387,6 @@ public interface StorageServiceMBean * rebuild the specified indexes */ public void rebuildSecondaryIndex(String ksName, String cfName, String... idxNames); + + public void resetLocalSchema() throws IOException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index 198e659..04324f2 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -121,6 +121,7 @@ public class NodeCmd DESCRIBERING, RANGEKEYSAMPLE, REBUILD_INDEX, + RESETLOCALSCHEMA } @@ -150,6 +151,7 @@ public class NodeCmd addCmdHelp(header, "gossipinfo", "Shows the gossip information for the cluster"); addCmdHelp(header, "invalidatekeycache", "Invalidate the key cache"); addCmdHelp(header, "invalidaterowcache", "Invalidate the row cache"); + addCmdHelp(header, "resetlocalschema", "Reset node's local schema and resync"); // One arg addCmdHelp(header, "netstats [host]", "Print network information on provided host (connecting node by default)"); @@ -682,6 +684,7 @@ public class NodeCmd case DISABLETHRIFT : probe.stopThriftServer(); break; case ENABLETHRIFT : probe.startThriftServer(); break; case STATUSTHRIFT : nodeCmd.printIsThriftServerRunning(System.out); break; + case RESETLOCALSCHEMA: probe.resetLocalSchema(); break; case DRAIN : try { probe.drain(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 8b0d668..fc0c21c 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -653,12 +653,16 @@ public class NodeProbe { ssProxy.rebuild(sourceDc); } - + public List<String> getRangeKeySample() { return ssProxy.getRangeKeySample(); } + public void resetLocalSchema() throws IOException + { + ssProxy.resetLocalSchema(); + } } class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>
