Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 6770281af -> 1f6bf3651
Fix PITR commitlog replay patch by Branimir Lambov; reviewed by Jeremiah Jordan for CASSANDRA-9195 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f6bf365 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f6bf365 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f6bf365 Branch: refs/heads/cassandra-2.1 Commit: 1f6bf3651a76f0ae2c61ee7fd994573f249e78d5 Parents: 6770281 Author: Branimir Lambov <[email protected]> Authored: Thu Apr 30 18:39:19 2015 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Thu Apr 30 18:40:23 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../db/commitlog/CommitLogReplayer.java | 44 ++++++++++++++++++-- 2 files changed, 42 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f6bf365/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 481fb80..a01e8ed 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.6 + * Fix PITR commitlog replay (CASSANDRA-9195) * GCInspector logs very different times (CASSANDRA-9124) * Fix deleting from an empty list (CASSANDRA-9198) * Update tuple and collection types that use a user-defined type when that UDT @@ -8,6 +9,7 @@ Merged from 2.0: * IncomingTcpConnection thread is not named (CASSANDRA-9262) * Close incoming connections when MessagingService is stopped (CASSANDRA-9238) + 2.1.5 * Re-add deprecated cold_reads_to_omit param for backwards compat (CASSANDRA-9203) * Make anticompaction visible in compactionstats (CASSANDRA-9098) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f6bf365/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 63afcbc..57f4b90 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -34,13 +34,13 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.*; - import org.cliffc.high_scale_lib.NonBlockingHashSet; public class CommitLogReplayer @@ -58,6 +58,8 @@ public class CommitLogReplayer private final PureJavaCrc32 checksum; private byte[] buffer; + private final ReplayFilter replayFilter; + public CommitLogReplayer() { this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>(); @@ -68,6 +70,8 @@ public class CommitLogReplayer this.replayedCount = new AtomicInteger(); this.checksum = new PureJavaCrc32(); + replayFilter = ReplayFilter.create(); + // compute per-CF and global replay positions cfPositions = new HashMap<UUID, ReplayPosition>(); Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator); @@ -81,7 +85,27 @@ public class CommitLogReplayer // but, if we've truncted the cf in question, then we need to need to start replay after the truncation ReplayPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId); if (truncatedAt != null) - rp = replayPositionOrdering.max(Arrays.asList(rp, truncatedAt)); + { + // Point in time restore is taken to mean that the tables need to be recovered even if they were + // deleted at a later point in time. Any truncation record after that point must thus be cleared prior + // to recovery (CASSANDRA-9195). + long restoreTime = CommitLog.instance.archiver.restorePointInTime; + long truncatedTime = SystemKeyspace.getTruncatedAt(cfs.metadata.cfId); + if (truncatedTime > restoreTime) + { + if (replayFilter.includes(cfs.metadata)) + { + logger.info("Restore point in time is before latest truncation of table {}.{}. Clearing truncation record.", + cfs.metadata.ksName, + cfs.metadata.cfName); + SystemKeyspace.removeTruncationRecord(cfs.metadata.cfId); + } + } + else + { + rp = replayPositionOrdering.max(Arrays.asList(rp, truncatedAt)); + } + } cfPositions.put(cfs.metadata.cfId, rp); } @@ -167,6 +191,8 @@ public class CommitLogReplayer { public abstract Iterable<ColumnFamily> filter(Mutation mutation); + public abstract boolean includes(CFMetaData metadata); + public static ReplayFilter create() { // If no replaylist is supplied an empty array of strings is used to replay everything. @@ -183,7 +209,8 @@ public class CommitLogReplayer Keyspace ks = Schema.instance.getKeyspaceInstance(pair[0]); if (ks == null) throw new IllegalArgumentException("Unknown keyspace " + pair[0]); - if (ks.getColumnFamilyStore(pair[1]) == null) + ColumnFamilyStore cfs = ks.getColumnFamilyStore(pair[1]); + if (cfs == null) throw new IllegalArgumentException(String.format("Unknown table %s.%s", pair[0], pair[1])); toReplay.put(pair[0], pair[1]); @@ -198,6 +225,11 @@ public class CommitLogReplayer { return mutation.getColumnFamilies(); } + + public boolean includes(CFMetaData metadata) + { + return true; + } } private static class CustomReplayFilter extends ReplayFilter @@ -223,11 +255,15 @@ public class CommitLogReplayer } }); } + + public boolean includes(CFMetaData metadata) + { + return toReplay.containsEntry(metadata.ksName, metadata.cfName); + } } public void recover(File file) throws IOException { - final ReplayFilter replayFilter = ReplayFilter.create(); logger.info("Replaying {}", file.getPath()); CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); final long segmentId = desc.id;
