fix broken system.schema_* timestamps on system startup patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-4561
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/846b1401 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/846b1401 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/846b1401 Branch: refs/heads/trunk Commit: 846b14019de484e9bf36e8907c48d3518f09e32f Parents: 5df7b25 Author: Pavel Yaskevich <[email protected]> Authored: Wed Aug 29 14:12:03 2012 +0300 Committer: Pavel Yaskevich <[email protected]> Committed: Wed Sep 5 00:15:43 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/DefsTable.java | 77 +++++++++++++++- src/java/org/apache/cassandra/db/SystemTable.java | 1 + 3 files changed, 78 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/846b1401/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6125c9c..75e856c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ * Log(info) schema changes (CASSANDRA-4547) * Change nodetool setcachecapcity to manipulate global caches (CASSANDRA-4563) * (cql3) fix setting compaction strategy (CASSANDRA-4597) + * fix broken system.schema_* timestamps on system startup (CASSANDRA-4561) Merged from 1.0: * increase Xss to 160k to accomodate latest 1.6 JVMs (CASSANDRA-4602) * fix toString of hint destination tokens (CASSANDRA-4568) http://git-wip-us.apache.org/repos/asf/cassandra/blob/846b1401/src/java/org/apache/cassandra/db/DefsTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java index 93f9867..2e4e5d3 100644 --- a/src/java/org/apache/cassandra/db/DefsTable.java +++ b/src/java/org/apache/cassandra/db/DefsTable.java @@ -21,6 +21,7 @@ package org.apache.cassandra.db; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.slf4j.Logger; @@ -152,7 +153,7 @@ public class DefsTable for (Row row : serializedSchema) { - if (row.cf == null || (row.cf.isMarkedForDelete() && row.cf.isEmpty())) + if (invalidSchemaRow(row)) continue; keyspaces.add(KSMetaData.fromSchema(row, serializedColumnFamilies(row.key))); @@ -161,6 +162,80 @@ public class DefsTable return keyspaces; } + public static void fixSchemaNanoTimestamps() throws IOException + { + fixSchemaNanoTimestamp(SystemTable.SCHEMA_KEYSPACES_CF); + fixSchemaNanoTimestamp(SystemTable.SCHEMA_COLUMNFAMILIES_CF); + fixSchemaNanoTimestamp(SystemTable.SCHEMA_COLUMNS_CF); + } + + private static void fixSchemaNanoTimestamp(String columnFamily) throws IOException + { + ColumnFamilyStore cfs = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(columnFamily); + + boolean needsCleanup = false; + long timestamp = FBUtilities.timestampMicros(); + + List<Row> rows = SystemTable.serializedSchema(columnFamily); + + row_check_loop: + for (Row row : rows) + { + if (invalidSchemaRow(row)) + continue; + + for (IColumn column : row.cf.columns) + { + if (column.timestamp() > timestamp) + { + needsCleanup = true; + // exit the loop on first found timestamp mismatch as we know that it + // wouldn't be only one column/row that we would have to fix anyway + break row_check_loop; + } + } + } + + if (!needsCleanup) + return; + + logger.info("Fixing timestamps of schema ColumnFamily " + columnFamily + "..."); + + try + { + cfs.truncate().get(); + } + catch (ExecutionException e) + { + throw new RuntimeException(e); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + + for (Row row : rows) + { + if (invalidSchemaRow(row)) + continue; + + RowMutation mutation = new RowMutation(Table.SYSTEM_TABLE, row.key.key); + + for (IColumn column : row.cf.columns) + { + if (column.isLive()) + mutation.add(new QueryPath(columnFamily, null, column.name()), column.value(), timestamp); + } + + mutation.apply(); + } + } + + private static boolean invalidSchemaRow(Row row) + { + return row.cf == null || (row.cf.isMarkedForDelete() && row.cf.isEmpty()); + } + public static ByteBuffer searchComposite(String name, boolean start) { assert name != null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/846b1401/src/java/org/apache/cassandra/db/SystemTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java index af07156..3256b91 100644 --- a/src/java/org/apache/cassandra/db/SystemTable.java +++ b/src/java/org/apache/cassandra/db/SystemTable.java @@ -83,6 +83,7 @@ public class SystemTable public static void finishStartup() throws IOException { + DefsTable.fixSchemaNanoTimestamps(); setupVersion(); purgeIncompatibleHints(); }
