Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 6770281af -> 1f6bf3651


Fix PITR commitlog replay

patch by Branimir Lambov; reviewed by Jeremiah Jordan for CASSANDRA-9195


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f6bf365
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f6bf365
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f6bf365

Branch: refs/heads/cassandra-2.1
Commit: 1f6bf3651a76f0ae2c61ee7fd994573f249e78d5
Parents: 6770281
Author: Branimir Lambov <[email protected]>
Authored: Thu Apr 30 18:39:19 2015 +0300
Committer: Aleksey Yeschenko <[email protected]>
Committed: Thu Apr 30 18:40:23 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../db/commitlog/CommitLogReplayer.java         | 44 ++++++++++++++++++--
 2 files changed, 42 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f6bf365/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 481fb80..a01e8ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.6
+ * Fix PITR commitlog replay (CASSANDRA-9195)
  * GCInspector logs very different times (CASSANDRA-9124)
  * Fix deleting from an empty list (CASSANDRA-9198)
  * Update tuple and collection types that use a user-defined type when that UDT
@@ -8,6 +9,7 @@ Merged from 2.0:
  * IncomingTcpConnection thread is not named (CASSANDRA-9262)
  * Close incoming connections when MessagingService is stopped (CASSANDRA-9238)
 
+
 2.1.5
  * Re-add deprecated cold_reads_to_omit param for backwards compat 
(CASSANDRA-9203)
  * Make anticompaction visible in compactionstats (CASSANDRA-9098)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f6bf365/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 63afcbc..57f4b90 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -34,13 +34,13 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.*;
-
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 public class CommitLogReplayer
@@ -58,6 +58,8 @@ public class CommitLogReplayer
     private final PureJavaCrc32 checksum;
     private byte[] buffer;
 
+    private final ReplayFilter replayFilter;
+
     public CommitLogReplayer()
     {
         this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
@@ -68,6 +70,8 @@ public class CommitLogReplayer
         this.replayedCount = new AtomicInteger();
         this.checksum = new PureJavaCrc32();
 
+        replayFilter = ReplayFilter.create();
+
         // compute per-CF and global replay positions
         cfPositions = new HashMap<UUID, ReplayPosition>();
         Ordering<ReplayPosition> replayPositionOrdering = 
Ordering.from(ReplayPosition.comparator);
@@ -81,7 +85,27 @@ public class CommitLogReplayer
             // but, if we've truncted the cf in question, then we need to need 
to start replay after the truncation
             ReplayPosition truncatedAt = 
SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
             if (truncatedAt != null)
-                rp = replayPositionOrdering.max(Arrays.asList(rp, 
truncatedAt));
+            {
+                // Point in time restore is taken to mean that the tables need 
to be recovered even if they were
+                // deleted at a later point in time. Any truncation record 
after that point must thus be cleared prior
+                // to recovery (CASSANDRA-9195).
+                long restoreTime = 
CommitLog.instance.archiver.restorePointInTime;
+                long truncatedTime = 
SystemKeyspace.getTruncatedAt(cfs.metadata.cfId);
+                if (truncatedTime > restoreTime)
+                {
+                    if (replayFilter.includes(cfs.metadata))
+                    {
+                        logger.info("Restore point in time is before latest 
truncation of table {}.{}. Clearing truncation record.",
+                                    cfs.metadata.ksName,
+                                    cfs.metadata.cfName);
+                        
SystemKeyspace.removeTruncationRecord(cfs.metadata.cfId);
+                    }
+                }
+                else
+                {
+                    rp = replayPositionOrdering.max(Arrays.asList(rp, 
truncatedAt));
+                }
+            }
 
             cfPositions.put(cfs.metadata.cfId, rp);
         }
@@ -167,6 +191,8 @@ public class CommitLogReplayer
     {
         public abstract Iterable<ColumnFamily> filter(Mutation mutation);
 
+        public abstract boolean includes(CFMetaData metadata);
+
         public static ReplayFilter create()
         {
             // If no replaylist is supplied an empty array of strings is used 
to replay everything.
@@ -183,7 +209,8 @@ public class CommitLogReplayer
                 Keyspace ks = Schema.instance.getKeyspaceInstance(pair[0]);
                 if (ks == null)
                     throw new IllegalArgumentException("Unknown keyspace " + 
pair[0]);
-                if (ks.getColumnFamilyStore(pair[1]) == null)
+                ColumnFamilyStore cfs = ks.getColumnFamilyStore(pair[1]);
+                if (cfs == null)
                     throw new IllegalArgumentException(String.format("Unknown 
table %s.%s", pair[0], pair[1]));
 
                 toReplay.put(pair[0], pair[1]);
@@ -198,6 +225,11 @@ public class CommitLogReplayer
         {
             return mutation.getColumnFamilies();
         }
+
+        public boolean includes(CFMetaData metadata)
+        {
+            return true;
+        }
     }
 
     private static class CustomReplayFilter extends ReplayFilter
@@ -223,11 +255,15 @@ public class CommitLogReplayer
                 }
             });
         }
+
+        public boolean includes(CFMetaData metadata)
+        {
+            return toReplay.containsEntry(metadata.ksName, metadata.cfName);
+        }
     }
 
     public void recover(File file) throws IOException
     {
-        final ReplayFilter replayFilter = ReplayFilter.create();
         logger.info("Replaying {}", file.getPath());
         CommitLogDescriptor desc = 
CommitLogDescriptor.fromFileName(file.getName());
         final long segmentId = desc.id;

Reply via email to