merge from 2.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f01166a5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f01166a5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f01166a5

Branch: refs/heads/trunk
Commit: f01166a5358777a6d749d488fdcee367f4b1a949
Parents: c2bd0d8 66df206
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Thu Nov 7 13:54:41 2013 -0600
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Thu Nov 7 13:54:41 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/commitlog/CommitLogReplayer.java         | 101 ++++++++++++++++---
 2 files changed, 89 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f01166a5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4b1f030,6ff2799..50956f5
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,5 +1,18 @@@
 +2.1
 + * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
 + * change logging from log4j to logback (CASSANDRA-5883)
 + * switch to LZ4 compression for internode communication (CASSANDRA-5887)
 + * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
 + * Remove 1.2 network compatibility code (CASSANDRA-5960)
 + * Remove leveled json manifest migration code (CASSANDRA-5996)
 + * Remove CFDefinition (CASSANDRA-6253)
 + * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
 + * User-defined types for CQL3 (CASSANDRA-5590)
 + * Use of o.a.c.metrics in nodetool (CASSANDRA-5871)
 +
 +
  2.0.3
+  * Allow restoring specific columnfamilies from archived CL (CASSANDRA-4809)
   * Avoid flushing compaction_history after each operation (CASSANDRA-6287)
   * Fix repair assertion error when tombstones expire (CASSANDRA-6277)
   * Skip loading corrupt key cache (CASSANDRA-6260)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f01166a5/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 532736b,74daecb..c9659c4
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -110,9 -114,73 +114,72 @@@ public class CommitLogReplaye
          return replayedCount.get();
      }
  
+     private abstract static class ReplayFilter
+     {
+         public abstract Iterable<ColumnFamily> filter(RowMutation rm);
+ 
+         public static ReplayFilter create()
+         {
+             // If no replaylist is supplied an empty array of strings is used 
to replay everything.
+             if (System.getProperty("cassandra.replayList") == null)
+                 return new AlwaysReplayFilter();
+ 
+             Multimap<String, String> toReplay = HashMultimap.create();
+             for (String rawPair : 
System.getProperty("cassandra.replayList").split(","))
+             {
+                 String[] pair = rawPair.trim().split(".");
+                 if (pair.length != 2)
+                     throw new IllegalArgumentException("Each table to be 
replayed must be fully qualified with keyspace name, e.g., 'system.peers'");
+ 
+                 Keyspace ks = Schema.instance.getKeyspaceInstance(pair[0]);
+                 if (ks == null)
+                     throw new IllegalArgumentException("Unknown keyspace " + 
pair[0]);
+                 if (ks.getColumnFamilyStore(pair[1]) == null)
+                     throw new IllegalArgumentException(String.format("Unknown 
table %s.%s", pair[0], pair[1]));
+ 
+                 toReplay.put(pair[0], pair[1]);
+             }
+             return new CustomReplayFilter(toReplay);
+         }
+     }
+ 
+     private static class AlwaysReplayFilter extends ReplayFilter
+     {
+         public Iterable<ColumnFamily> filter(RowMutation rm)
+         {
+             return rm.getColumnFamilies();
+         }
+     }
+ 
+     private static class CustomReplayFilter extends ReplayFilter
+     {
+         private Multimap<String, String> toReplay;
+ 
+         public CustomReplayFilter(Multimap<String, String> toReplay)
+         {
+             this.toReplay = toReplay;
+         }
+ 
+         public Iterable<ColumnFamily> filter(RowMutation rm)
+         {
+             final Collection<String> cfNames = 
toReplay.get(rm.getKeyspaceName());
+             if (cfNames == null)
+                 return Collections.emptySet();
+ 
+             return Iterables.filter(rm.getColumnFamilies(), new 
Predicate<ColumnFamily>()
+             {
+                 public boolean apply(ColumnFamily cf)
+                 {
+                     return cfNames.contains(cf.metadata().cfName);
+                 }
+             });
+         }
+     }
+ 
      public void recover(File file) throws IOException
      {
+         final ReplayFilter replayFilter = ReplayFilter.create();
 -
 -        logger.info("Replaying " + file.getPath());
 +        logger.info("Replaying {}", file.getPath());
          CommitLogDescriptor desc = 
CommitLogDescriptor.fromFileName(file.getName());
          final long segment = desc.id;
          int version = desc.getMessagingVersion();
@@@ -239,10 -307,10 +306,9 @@@
                  }
  
                  if (logger.isDebugEnabled())
 -                    logger.debug(String.format("replaying mutation for %s.%s: 
%s", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + 
StringUtils.join(rm.getColumnFamilies().iterator(), ", ")
 -                            + "}"));
 +                    logger.debug("replaying mutation for {}.{}: {}", 
rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + 
StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}");
  
                  final long entryLocation = reader.getFilePointer();
-                 final RowMutation frm = rm;
                  Runnable runnable = new WrappedRunnable()
                  {
                      public void runMayThrow() throws IOException

Reply via email to