merge from 1.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d90eaf5b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d90eaf5b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d90eaf5b Branch: refs/heads/trunk Commit: d90eaf5b05a98ad090746424dcd507a915c55ca9 Parents: 6b29ab7 846b140 Author: Pavel Yaskevich <[email protected]> Authored: Wed Sep 5 00:17:00 2012 +0300 Committer: Pavel Yaskevich <[email protected]> Committed: Wed Sep 5 00:17:00 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/DefsTable.java | 77 +++++++++++++++- src/java/org/apache/cassandra/db/SystemTable.java | 1 + 3 files changed, 78 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d90eaf5b/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d90eaf5b/src/java/org/apache/cassandra/db/DefsTable.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d90eaf5b/src/java/org/apache/cassandra/db/SystemTable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SystemTable.java index 23d9269,3256b91..cd64d2c --- a/src/java/org/apache/cassandra/db/SystemTable.java +++ b/src/java/org/apache/cassandra/db/SystemTable.java @@@ -86,62 -83,66 +86,63 @@@ public class SystemTabl public static void finishStartup() throws IOException { + DefsTable.fixSchemaNanoTimestamps(); setupVersion(); - purgeIncompatibleHints(); + try + { + upgradeSystemData(); + } + catch (ExecutionException e) + { + throw new RuntimeException(e); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } } - private static void setupVersion() throws IOException + private static void setupVersion() { - RowMutation rm; - ColumnFamily cf; - - rm = new RowMutation(Table.SYSTEM_TABLE, ByteBufferUtil.bytes("build")); - cf = ColumnFamily.create(Table.SYSTEM_TABLE, VERSION_CF); - cf.addColumn(new Column(ByteBufferUtil.bytes("version"), ByteBufferUtil.bytes(FBUtilities.getReleaseVersionString()), FBUtilities.timestampMicros())); - rm.add(cf); - rm.apply(); - - rm = new RowMutation(Table.SYSTEM_TABLE, ByteBufferUtil.bytes("cql")); - cf = ColumnFamily.create(Table.SYSTEM_TABLE, VERSION_CF); - cf.addColumn(new Column(ByteBufferUtil.bytes("version"), ByteBufferUtil.bytes(QueryProcessor.CQL_VERSION.toString()), FBUtilities.timestampMicros())); - rm.add(cf); - rm.apply(); - - rm = new RowMutation(Table.SYSTEM_TABLE, ByteBufferUtil.bytes("thrift")); - cf = ColumnFamily.create(Table.SYSTEM_TABLE, VERSION_CF); - cf.addColumn(new Column(ByteBufferUtil.bytes("version"), ByteBufferUtil.bytes(Constants.VERSION), FBUtilities.timestampMicros())); - rm.add(cf); - rm.apply(); + String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version) VALUES ('%s', '%s', '%s', '%s')"; + processInternal(String.format(req, LOCAL_CF, + LOCAL_KEY, + FBUtilities.getReleaseVersionString(), + QueryProcessor.CQL_VERSION.toString(), + Constants.VERSION)); } - /** if hints become incompatible across versions of cassandra, that logic (and associated purging) is managed here. */ - private static void purgeIncompatibleHints() throws IOException + /** if system data becomes incompatible across versions of cassandra, that logic (and associated purging) is managed here */ + private static void upgradeSystemData() throws IOException, ExecutionException, InterruptedException { - ByteBuffer upgradeMarker = ByteBufferUtil.bytes("Pre-1.0 hints purged"); - Table table = Table.open(Table.SYSTEM_TABLE); - QueryFilter filter = QueryFilter.getNamesFilter(decorate(COOKIE_KEY), new QueryPath(STATUS_CF), upgradeMarker); - ColumnFamily cf = table.getColumnFamilyStore(STATUS_CF).getColumnFamily(filter); - if (cf != null) + Table table = Table.open(Table.SYSTEM_KS); + ColumnFamilyStore oldStatusCfs = table.getColumnFamilyStore(OLD_STATUS_CF); + if (oldStatusCfs.getSSTables().size() > 0) { - logger.debug("Pre-1.0 hints already purged"); - return; + SortedSet<ByteBuffer> cols = new TreeSet<ByteBuffer>(BytesType.instance); + cols.add(ByteBufferUtil.bytes("ClusterName")); + cols.add(ByteBufferUtil.bytes("Token")); + QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes("L")), new QueryPath(OLD_STATUS_CF), cols); + ColumnFamily oldCf = oldStatusCfs.getColumnFamily(filter); + Iterator<IColumn> oldColumns = oldCf.columns.iterator(); + + String clusterName = ByteBufferUtil.string(oldColumns.next().value()); + // serialize the old token as a collection of (one )tokens. + Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(oldColumns.next().value()); + String tokenBytes = ByteBufferUtil.bytesToHex(serializeTokens(Collections.singleton(token))); + // (assume that any node getting upgraded was bootstrapped, since that was stored in a separate row for no particular reason) + String req = "INSERT INTO system.%s (key, cluster_name, token_bytes, bootstrapped) VALUES ('%s', '%s', '%s', '%s')"; + processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, clusterName, tokenBytes, BootstrapState.COMPLETED.name())); + + oldStatusCfs.truncate(); } - // marker not found. Snapshot + remove hints and add the marker - ColumnFamilyStore hintsCfs = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HintedHandOffManager.HINTS_CF); - if (hintsCfs.getSSTables().size() > 0) + ColumnFamilyStore oldHintsCfs = table.getColumnFamilyStore(OLD_HINTS_CF); + if (oldHintsCfs.getSSTables().size() > 0) { logger.info("Possible old-format hints found. Truncating"); - try - { - hintsCfs.truncate(); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + oldHintsCfs.truncate(); } - logger.debug("Marking pre-1.0 hints purged"); - RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, COOKIE_KEY); - rm.add(new QueryPath(STATUS_CF, null, upgradeMarker), ByteBufferUtil.bytes("oh yes, they were purged"), FBUtilities.timestampMicros()); - rm.apply(); } /**
