Updated Branches: refs/heads/cassandra-2.0 0e9906e6d -> 66df206d8 refs/heads/trunk c2bd0d8d1 -> f01166a53
Allow restoring specific columnfamilies from archived CL patch by Carl Yeksigian, Lyuben Todorov, and jbellis for CASSANDRA-4809 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/66df206d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/66df206d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/66df206d Branch: refs/heads/cassandra-2.0 Commit: 66df206d8df74fa9c5380e2ecfd3746fd6b3316e Parents: 0e9906e Author: Jonathan Ellis <[email protected]> Authored: Thu Nov 7 13:53:15 2013 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Thu Nov 7 13:53:35 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/commitlog/CommitLogReplayer.java | 102 ++++++++++++++++--- 2 files changed, 90 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/66df206d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 77749d6..6ff2799 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.3 + * Allow restoring specific columnfamilies from archived CL (CASSANDRA-4809) * Avoid flushing compaction_history after each operation (CASSANDRA-6287) * Fix repair assertion error when tombstones expire (CASSANDRA-6277) * Skip loading corrupt key cache (CASSANDRA-6260) http://git-wip-us.apache.org/repos/asf/cassandra/blob/66df206d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 250e516..74daecb 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -24,6 +24,10 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.Checksum; +import com.google.common.base.Predicate; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; import com.google.common.collect.Ordering; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -110,8 +114,72 @@ public class CommitLogReplayer return replayedCount.get(); } + private abstract static class ReplayFilter + { + public abstract Iterable<ColumnFamily> filter(RowMutation rm); + + public static ReplayFilter create() + { + // If no replaylist is supplied an empty array of strings is used to replay everything. + if (System.getProperty("cassandra.replayList") == null) + return new AlwaysReplayFilter(); + + Multimap<String, String> toReplay = HashMultimap.create(); + for (String rawPair : System.getProperty("cassandra.replayList").split(",")) + { + String[] pair = rawPair.trim().split("."); + if (pair.length != 2) + throw new IllegalArgumentException("Each table to be replayed must be fully qualified with keyspace name, e.g., 'system.peers'"); + + Keyspace ks = Schema.instance.getKeyspaceInstance(pair[0]); + if (ks == null) + throw new IllegalArgumentException("Unknown keyspace " + pair[0]); + if (ks.getColumnFamilyStore(pair[1]) == null) + throw new IllegalArgumentException(String.format("Unknown table %s.%s", pair[0], pair[1])); + + toReplay.put(pair[0], pair[1]); + } + return new CustomReplayFilter(toReplay); + } + } + + private static class AlwaysReplayFilter extends ReplayFilter + { + public Iterable<ColumnFamily> filter(RowMutation rm) + { + return rm.getColumnFamilies(); + } + } + + private static class CustomReplayFilter extends ReplayFilter + { + private Multimap<String, String> toReplay; + + public CustomReplayFilter(Multimap<String, String> toReplay) + { + this.toReplay = toReplay; + } + + public Iterable<ColumnFamily> filter(RowMutation rm) + { + final Collection<String> cfNames = toReplay.get(rm.getKeyspaceName()); + if (cfNames == null) + return Collections.emptySet(); + + return Iterables.filter(rm.getColumnFamilies(), new Predicate<ColumnFamily>() + { + public boolean apply(ColumnFamily cf) + { + return cfNames.contains(cf.metadata().cfName); + } + }); + } + } + public void recover(File file) throws IOException { + final ReplayFilter replayFilter = ReplayFilter.create(); + logger.info("Replaying " + file.getPath()); CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); final long segment = desc.id; @@ -195,7 +263,7 @@ public class CommitLogReplayer /* deserialize the commit log entry */ FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize); - RowMutation rm; + final RowMutation rm; try { // assuming version here. We've gone to lengths to make sure what gets written to the CL is in @@ -243,36 +311,36 @@ public class CommitLogReplayer + "}")); final long entryLocation = reader.getFilePointer(); - final RowMutation frm = rm; Runnable runnable = new WrappedRunnable() { public void runMayThrow() throws IOException { - if (Schema.instance.getKSMetaData(frm.getKeyspaceName()) == null) + if (Schema.instance.getKSMetaData(rm.getKeyspaceName()) == null) return; - if (pointInTimeExceeded(frm)) + if (pointInTimeExceeded(rm)) return; - final Keyspace keyspace = Keyspace.open(frm.getKeyspaceName()); + final Keyspace keyspace = Keyspace.open(rm.getKeyspaceName()); - // Rebuild the row mutation, omitting column families that - // a) have already been flushed, - // b) are part of a cf that was dropped. Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. + // Rebuild the row mutation, omitting column families that + // a) the user has requested that we ignore, + // b) have already been flushed, + // or c) are part of a cf that was dropped. + // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. RowMutation newRm = null; - for (ColumnFamily columnFamily : frm.getColumnFamilies()) + for (ColumnFamily columnFamily : replayFilter.filter(rm)) { if (Schema.instance.getCF(columnFamily.id()) == null) - // null means the cf has been dropped - continue; + continue; // dropped ReplayPosition rp = cfPositions.get(columnFamily.id()); - // replay if current segment is newer than last flushed one or, + // replay if current segment is newer than last flushed one or, // if it is the last known segment, if we are after the replay position if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position)) { if (newRm == null) - newRm = new RowMutation(frm.getKeyspaceName(), frm.key()); + newRm = new RowMutation(rm.getKeyspaceName(), rm.key()); newRm.add(columnFamily); replayedCount.incrementAndGet(); } @@ -311,4 +379,12 @@ public class CommitLogReplayer } return false; } + + private ColumnFamily getCFToRecover(String cfName, Collection<ColumnFamily> cfs) + { + for (ColumnFamily cf : cfs) + if (cf.metadata().cfName.equals(cfName)) + return cf; + return null; + } }
