merge from 2.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f01166a5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f01166a5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f01166a5 Branch: refs/heads/trunk Commit: f01166a5358777a6d749d488fdcee367f4b1a949 Parents: c2bd0d8 66df206 Author: Jonathan Ellis <jbel...@apache.org> Authored: Thu Nov 7 13:54:41 2013 -0600 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Thu Nov 7 13:54:41 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/commitlog/CommitLogReplayer.java | 101 ++++++++++++++++--- 2 files changed, 89 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f01166a5/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 4b1f030,6ff2799..50956f5 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,17 -1,5 +1,18 @@@ +2.1 + * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337) + * change logging from log4j to logback (CASSANDRA-5883) + * switch to LZ4 compression for internode communication (CASSANDRA-5887) + * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971) + * Remove 1.2 network compatibility code (CASSANDRA-5960) + * Remove leveled json manifest migration code (CASSANDRA-5996) + * Remove CFDefinition (CASSANDRA-6253) + * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278) + * User-defined types for CQL3 (CASSANDRA-5590) + * Use of o.a.c.metrics in nodetool (CASSANDRA-5871) + + 2.0.3 + * Allow restoring specific columnfamilies from archived CL (CASSANDRA-4809) * Avoid flushing compaction_history after each operation (CASSANDRA-6287) * Fix repair assertion error when tombstones expire (CASSANDRA-6277) * Skip loading corrupt key cache (CASSANDRA-6260) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f01166a5/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 532736b,74daecb..c9659c4 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@@ -110,9 -114,73 +114,72 @@@ public class CommitLogReplaye return replayedCount.get(); } + private abstract static class ReplayFilter + { + public abstract Iterable<ColumnFamily> filter(RowMutation rm); + + public static ReplayFilter create() + { + // If no replaylist is supplied an empty array of strings is used to replay everything. + if (System.getProperty("cassandra.replayList") == null) + return new AlwaysReplayFilter(); + + Multimap<String, String> toReplay = HashMultimap.create(); + for (String rawPair : System.getProperty("cassandra.replayList").split(",")) + { + String[] pair = rawPair.trim().split("."); + if (pair.length != 2) + throw new IllegalArgumentException("Each table to be replayed must be fully qualified with keyspace name, e.g., 'system.peers'"); + + Keyspace ks = Schema.instance.getKeyspaceInstance(pair[0]); + if (ks == null) + throw new IllegalArgumentException("Unknown keyspace " + pair[0]); + if (ks.getColumnFamilyStore(pair[1]) == null) + throw new IllegalArgumentException(String.format("Unknown table %s.%s", pair[0], pair[1])); + + toReplay.put(pair[0], pair[1]); + } + return new CustomReplayFilter(toReplay); + } + } + + private static class AlwaysReplayFilter extends ReplayFilter + { + public Iterable<ColumnFamily> filter(RowMutation rm) + { + return rm.getColumnFamilies(); + } + } + + private static class CustomReplayFilter extends ReplayFilter + { + private Multimap<String, String> toReplay; + + public CustomReplayFilter(Multimap<String, String> toReplay) + { + this.toReplay = toReplay; + } + + public Iterable<ColumnFamily> filter(RowMutation rm) + { + final Collection<String> cfNames = toReplay.get(rm.getKeyspaceName()); + if (cfNames == null) + return Collections.emptySet(); + + return Iterables.filter(rm.getColumnFamilies(), new Predicate<ColumnFamily>() + { + public boolean apply(ColumnFamily cf) + { + return cfNames.contains(cf.metadata().cfName); + } + }); + } + } + public void recover(File file) throws IOException { + final ReplayFilter replayFilter = ReplayFilter.create(); - - logger.info("Replaying " + file.getPath()); + logger.info("Replaying {}", file.getPath()); CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); final long segment = desc.id; int version = desc.getMessagingVersion(); @@@ -239,10 -307,10 +306,9 @@@ } if (logger.isDebugEnabled()) - logger.debug(String.format("replaying mutation for %s.%s: %s", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") - + "}")); + logger.debug("replaying mutation for {}.{}: {}", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}"); final long entryLocation = reader.getFilePointer(); - final RowMutation frm = rm; Runnable runnable = new WrappedRunnable() { public void runMayThrow() throws IOException