Author: jbellis
Date: Thu Oct  7 00:09:37 2010
New Revision: 1005296

URL: http://svn.apache.org/viewvc?rev=1005296&view=rev
Log:
add cache save/load.  patch by mdennis and jbellis for CASSANDRA-1417

Added:
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/conf/storage-conf.xml
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/cache/InstrumentedCache.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/CFMetaData.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    cassandra/branches/cassandra-0.6/test/conf/storage-conf.xml

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Oct  7 00:09:37 2010
@@ -41,10 +41,12 @@ dev
  * add memtable, cache information to GCInspector logs (CASSANDRA-1558)
  * enable/disable HintedHandoff via JMX (CASSANDRA-1550)
  * allow nodes to change IPs between restarts (CASSANDRA-1518)
+ * add save/load of row + key caches (CASSANDRA-1417)
  * remove assertion causing rare (and harmless) error messages in
    commitlog (CASSANDRA-1330)
  * optimize SSTableReader.loadIndexFile to reduce re-reading of index
    data during server startup (CASSANDRA-1526)
+ * add cache save/load ability (CASSANDRA-1417)
 
 
 0.6.5

Modified: cassandra/branches/cassandra-0.6/conf/storage-conf.xml
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/conf/storage-conf.xml?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/conf/storage-conf.xml (original)
+++ cassandra/branches/cassandra-0.6/conf/storage-conf.xml Thu Oct  7 00:09:37 
2010
@@ -110,11 +110,19 @@
        ~ ratios. Specify a fraction (value less than 1), a percentage (ending 
in
        ~ a % sign) or an absolute number of rows to cache. 
        ~ RowsCached defaults to 0, i.e., row cache is off by default.
+       ~ 
+       ~ Row and key caches may also be saved periodically; if so, the last-
+       ~ saved cache will be loaded in at server start.  By default, cache
+       ~ saving is off.
        ~
        ~ Remember, when using caches as a percentage, they WILL grow with
        ~ your data set!
       -->
-      <ColumnFamily Name="Standard1" CompareWith="BytesType"/>
+      <ColumnFamily Name="Standard1" CompareWith="BytesType"
+                    KeysCached="1000"
+                    RowsCached="100"
+                    RowCacheSavePeriodInSeconds="0"
+                    KeyCacheSavePeriodInSeconds="3600"/>
       <ColumnFamily Name="Standard2" 
                     CompareWith="UTF8Type"
                     KeysCached="100%"/>
@@ -204,6 +212,7 @@
    ~ disk.  Keep the data disks and the CommitLog disks separate for best
    ~ performance
   -->
+  <SavedCachesDirectory>/var/lib/cassandra/saved_caches</SavedCachesDirectory>
   <CommitLogDirectory>/var/lib/cassandra/commitlog</CommitLogDirectory>
   <DataFileDirectories>
       <DataFileDirectory>/var/lib/cassandra/data</DataFileDirectory>

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/cache/InstrumentedCache.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/cache/InstrumentedCache.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/cache/InstrumentedCache.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/cache/InstrumentedCache.java
 Thu Oct  7 00:09:37 2010
@@ -21,6 +21,7 @@ package org.apache.cassandra.cache;
  */
 
 
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import 
com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
@@ -123,4 +124,9 @@ public class InstrumentedCache<K, V>
         requests.set(0);
         hits.set(0);
     }
+
+    public Set<K> getKeySet()
+    {
+        return map.keySet();
+    }
 }

Added: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java?rev=1005296&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
 (added)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
 Thu Oct  7 00:09:37 2010
@@ -0,0 +1,93 @@
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+import org.apache.log4j.Logger;
+
+public class RetryingScheduledThreadPoolExecutor extends 
ScheduledThreadPoolExecutor
+{
+    protected static Logger logger = 
Logger.getLogger(RetryingScheduledThreadPoolExecutor.class);
+
+    public RetryingScheduledThreadPoolExecutor(String threadPoolName, int 
priority)
+    {
+        this(1, threadPoolName, priority);
+    }
+
+    public RetryingScheduledThreadPoolExecutor(int corePoolSize, String 
threadPoolName, int priority)
+    {
+        super(corePoolSize, new NamedThreadFactory(threadPoolName, priority));
+    }
+
+    public RetryingScheduledThreadPoolExecutor(String threadPoolName)
+    {
+        this(1, threadPoolName, Thread.NORM_PRIORITY);
+    }
+
+    @Override
+    protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, 
RunnableScheduledFuture<V> task)
+    {
+        return new LoggingScheduledFuture<V>(task);
+    }
+
+    private class LoggingScheduledFuture<V> implements 
RunnableScheduledFuture<V>
+    {
+        private final RunnableScheduledFuture<V> task;
+
+        public LoggingScheduledFuture(RunnableScheduledFuture<V> task)
+        {
+            this.task = task;
+        }
+
+        public boolean isPeriodic()
+        {
+            return task.isPeriodic();
+        }
+
+        public long getDelay(TimeUnit unit)
+        {
+            return task.getDelay(unit);
+        }
+
+        public int compareTo(Delayed o)
+        {
+            return task.compareTo(o);
+        }
+
+        public void run()
+        {
+            try
+            {
+                task.run();
+            }
+            catch (Exception e)
+            {
+                logger.error("error running scheduled task", e);
+            }
+        }
+
+        public boolean cancel(boolean mayInterruptIfRunning)
+        {
+            return task.cancel(mayInterruptIfRunning);
+        }
+
+        public boolean isCancelled()
+        {
+            return task.isCancelled();
+        }
+
+        public boolean isDone()
+        {
+            return task.isDone();
+        }
+
+        public V get() throws InterruptedException, ExecutionException
+        {
+            return task.get();
+        }
+
+        public V get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException
+        {
+            return task.get(timeout, unit);
+        }
+    }
+}

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/CFMetaData.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/CFMetaData.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/CFMetaData.java
 Thu Oct  7 00:09:37 2010
@@ -21,12 +21,6 @@ package org.apache.cassandra.config;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.FBUtilities;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
 public final class CFMetaData
 {
     public final static double DEFAULT_KEY_CACHE_SIZE = 200000;
@@ -40,8 +34,11 @@ public final class CFMetaData
     public final String comment; // for humans only
     public final double rowCacheSize; // default 0
     public final double keyCacheSize; // default 0.01
+    public final int rowCacheSavePeriodInSeconds; //default 0 (off)
+    public final int keyCacheSavePeriodInSeconds; //default 0 (off)
 
-    CFMetaData(String tableName, String cfName, String columnType, 
AbstractType comparator, AbstractType subcolumnComparator, String comment, 
double rowCacheSize, double keyCacheSize)
+    CFMetaData(String tableName, String cfName, String columnType, 
AbstractType comparator, AbstractType subcolumnComparator,
+               String comment, double rowCacheSize, double keyCacheSize, int 
rowCacheSavePeriodInSeconds, int keyCacheSavePeriodInSeconds)
     {
         this.tableName = tableName;
         this.cfName = cfName;
@@ -51,6 +48,8 @@ public final class CFMetaData
         this.comment = comment;
         this.rowCacheSize = rowCacheSize;
         this.keyCacheSize = keyCacheSize;
+        this.rowCacheSavePeriodInSeconds = rowCacheSavePeriodInSeconds;
+        this.keyCacheSavePeriodInSeconds = keyCacheSavePeriodInSeconds;
     }
 
     // a quick and dirty pretty printer for describing the column family...
@@ -73,6 +72,8 @@ public final class CFMetaData
                 && FBUtilities.equals(other.subcolumnComparator, 
subcolumnComparator)
                 && FBUtilities.equals(other.comment, comment)
                 && other.rowCacheSize == rowCacheSize
-                && other.keyCacheSize == keyCacheSize;
+                && other.keyCacheSize == keyCacheSize
+                && other.rowCacheSavePeriodInSeconds == 
rowCacheSavePeriodInSeconds
+                && other.keyCacheSavePeriodInSeconds == 
keyCacheSavePeriodInSeconds;
     }
 }

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
 Thu Oct  7 00:09:37 2010
@@ -18,18 +18,31 @@
 
 package org.apache.cassandra.config;
 
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.*;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.TransformerException;
+import javax.xml.xpath.XPathExpressionException;
+
 import org.apache.cassandra.auth.AllowAllAuthenticator;
 import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.HintedHandOffManager;
+import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.locator.IEndPointSnitch;
-import org.apache.cassandra.locator.DynamicEndpointSnitch;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
+import org.apache.cassandra.locator.IEndPointSnitch;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.XMLUtils;
 import org.apache.log4j.Logger;
@@ -37,17 +50,6 @@ import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.TransformerException;
-import javax.xml.xpath.XPathExpressionException;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.*;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.net.URL;
-
 public class DatabaseDescriptor
 {
     private static Logger logger = Logger.getLogger(DatabaseDescriptor.class);
@@ -82,6 +84,7 @@ public class DatabaseDescriptor
     /* Current index into the above list of directories */
     private static int currentIndex = 0;
     private static String logFileDirectory;
+    private static String savedCachesDirectory;
     private static int consistencyThreads = 4; // not configurable
     private static int concurrentReaders = 8;
     private static int concurrentWriters = 32;
@@ -142,6 +145,25 @@ public class DatabaseDescriptor
 
     private final static String STORAGE_CONF_FILE = "storage-conf.xml";
 
+    public static final int DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS = 0;
+    public static final int DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS = 0;
+
+    public static File getSerializedRowCachePath(String ksName, String cfName)
+    {
+        return new File(savedCachesDirectory + File.separator + ksName + "-" + 
cfName + "-RowCache");
+    }
+
+    public static File getSerializedKeyCachePath(String ksName, String cfName)
+    {
+        return new File(savedCachesDirectory + File.separator + ksName + "-" + 
cfName + "-KeyCache");
+    }
+
+    public static int getCompactionPriority()
+    {
+        String priorityString = 
System.getProperty("cassandra.compaction.priority");
+        return priorityString == null ? Thread.NORM_PRIORITY : 
Integer.parseInt(priorityString);
+    }
+
     /**
      * Try the storage-config system property, and then inspect the classpath.
      */
@@ -470,6 +492,7 @@ public class DatabaseDescriptor
             /* data file and commit log directories. they get created later, 
when they're needed. */
             dataFileDirectories = 
xmlUtils.getNodeValues("/Storage/DataFileDirectories/DataFileDirectory");
             logFileDirectory = 
xmlUtils.getNodeValue("/Storage/CommitLogDirectory");
+            savedCachesDirectory = 
xmlUtils.getNodeValue("/Storage/SavedCachesDirectory");
 
             for (String datadir : dataFileDirectories)
             {
@@ -518,7 +541,9 @@ public class DatabaseDescriptor
                                                                             
null,
                                                                             
"persistent metadata for the local node",
                                                                             
0.0,
-                                                                            
0.01));
+                                                                            
0.01,
+                                                                            
DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS,
+                                                                            
DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS));
 
             systemMeta.cfMetaData.put(HintedHandOffManager.HINTS_CF, new 
CFMetaData(Table.SYSTEM_TABLE,
                                                                                
     HintedHandOffManager.HINTS_CF,
@@ -527,7 +552,9 @@ public class DatabaseDescriptor
                                                                                
     new BytesType(),
                                                                                
     "hinted handoff data",
                                                                                
     0.0,
-                                                                               
     0.01));
+                                                                               
     0.01,
+                                                                               
     DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS,
+                                                                               
     DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS));
 
             /* Load the seeds for node contact points */
             String[] seedsxml = xmlUtils.getNodeValues("/Storage/Seeds/Seed");
@@ -732,7 +759,11 @@ public class DatabaseDescriptor
                     String comment = xmlUtils.getNodeValue(xqlCF + "Comment");
 
                     // insert it into the table dictionary.
-                    meta.cfMetaData.put(cfName, new CFMetaData(tableName, 
cfName, columnType, comparator, subcolumnComparator, comment, rowCacheSize, 
keyCacheSize));
+                    String rowCacheSavePeriodString = 
XMLUtils.getAttributeValue(columnFamily, "RowCacheSavePeriodInSeconds");
+                    String keyCacheSavePeriodString = 
XMLUtils.getAttributeValue(columnFamily, "KeyCacheSavePeriodInSeconds");
+                    int rowCacheSavePeriod = keyCacheSavePeriodString != null 
? Integer.valueOf(keyCacheSavePeriodString) : 
DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS;
+                    int keyCacheSavePeriod = rowCacheSavePeriodString != null 
? Integer.valueOf(rowCacheSavePeriodString) : 
DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS;
+                    meta.cfMetaData.put(cfName, new CFMetaData(tableName, 
cfName, columnType, comparator, subcolumnComparator, comment, rowCacheSize, 
keyCacheSize, keyCacheSavePeriod, rowCacheSavePeriod));
                 }
 
                 tables.put(meta.name, meta);
@@ -806,6 +837,11 @@ public class DatabaseDescriptor
                 throw new ConfigurationException("CommitLogDirectory must be 
specified");
             }
             FileUtils.createDirectory(logFileDirectory);
+            if (savedCachesDirectory == null)
+            {
+                throw new ConfigurationException("SavedCachesDirectory must be 
specified");
+            }
+            FileUtils.createDirectory(savedCachesDirectory);
         }
         catch (ConfigurationException ex) {
             logger.error("Fatal error: " + ex.getMessage());
@@ -864,7 +900,7 @@ public class DatabaseDescriptor
     {
         return partitioner;
     }
-    
+
     public static IEndPointSnitch getEndPointSnitch(String table)
     {
         return tables.get(table).epSnitch;

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Thu Oct  7 00:09:37 2010
@@ -18,10 +18,9 @@
 
 package org.apache.cassandra.db;
 
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
 import java.lang.management.ManagementFactory;
+import java.nio.charset.Charset;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -35,11 +34,11 @@ import javax.management.ObjectName;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
-import org.apache.log4j.Logger;
 import org.apache.commons.collections.IteratorUtils;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogSegment;
@@ -48,15 +47,19 @@ import org.apache.cassandra.db.marshal.A
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.*;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.SliceRange;
 import org.apache.cassandra.utils.*;
+import org.apache.log4j.Logger;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
+    private static final ScheduledThreadPoolExecutor cacheSavingExecutor =
+            new RetryingScheduledThreadPoolExecutor("CACHE-SAVER", 
Thread.MIN_PRIORITY);
+
     private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
 
     /*
@@ -119,7 +122,9 @@ public class ColumnFamilyStore implement
     private long maxRowCompactedSize = 0L;
     private long rowsCompactedTotalSize = 0L;
     private long rowsCompactedCount = 0L;
-    
+    private Runnable rowCacheWriteTask;
+    private Runnable keyCacheWriteTask;
+
     ColumnFamilyStore(String table, String columnFamilyName, boolean isSuper, 
int indexValue) throws IOException
     {
         table_ = table;
@@ -134,6 +139,7 @@ public class ColumnFamilyStore implement
         // scan for data files corresponding to this CF
         List<File> sstableFiles = new ArrayList<File>();
         Pattern auxFilePattern = 
Pattern.compile("(.*)(-Filter\\.db$|-Index\\.db$)");
+        Pattern tmpCacheFilePattern = Pattern.compile(table + "-" + 
columnFamilyName + "-(Key|Row)Cache.*\\.tmp$");
         for (File file : files())
         {
             String filename = file.getName();
@@ -157,6 +163,13 @@ public class ColumnFamilyStore implement
                 continue;
             }
 
+            if (tmpCacheFilePattern.matcher(filename).matches())
+            {
+                logger_.info("removing incomplete saved cache " + 
file.getAbsolutePath());
+                FileUtils.deleteWithConfirm(file);
+                continue;
+            }
+
             if (filename.contains("-Data.db"))
             {
                 sstableFiles.add(file.getAbsoluteFile());
@@ -165,6 +178,8 @@ public class ColumnFamilyStore implement
         Collections.sort(sstableFiles, new FileUtils.FileComparator());
 
         /* Load the index files and the Bloom Filters associated with them. */
+        ssTables_ = new SSTableTracker(table, columnFamilyName);
+        Set<String> savedKeys = 
readSavedCache(DatabaseDescriptor.getSerializedKeyCachePath(table, 
columnFamilyName), false);
         List<SSTableReader> sstables = new ArrayList<SSTableReader>();
         for (File file : sstableFiles)
         {
@@ -175,7 +190,7 @@ public class ColumnFamilyStore implement
             SSTableReader sstable;
             try
             {
-                sstable = SSTableReader.open(filename);
+                sstable = SSTableReader.open(filename, savedKeys, ssTables_);
             }
             catch (IOException ex)
             {
@@ -184,10 +199,108 @@ public class ColumnFamilyStore implement
             }
             sstables.add(sstable);
         }
-        ssTables_ = new SSTableTracker(table, columnFamilyName);
         ssTables_.add(sstables);
     }
 
+    protected Set<String> readSavedCache(File path, boolean sort) throws 
IOException
+    {
+        Set<String> keys;
+        if (sort)
+        {
+            // sort the results on read because cache may be written many 
times during server lifetime,
+            // so better to pay that price once on startup than sort at write 
time.
+            keys = new TreeSet<String>(StorageProxy.keyComparator);
+        }
+        else
+        {
+            keys = new HashSet<String>();
+        }
+        
+        long start = System.currentTimeMillis();
+        if (path.exists())
+        {
+            if (logger_.isDebugEnabled())
+                logger_.debug("reading saved cache from " + path);
+            ObjectInputStream in = new ObjectInputStream(new 
BufferedInputStream(new FileInputStream(path)));
+            Charset UTF8 = Charset.forName("UTF-8");
+            while (in.available() > 0)
+            {
+                int size = in.readInt();
+                byte[] bytes = new byte[size];
+                in.readFully(bytes);
+                keys.add(new String(bytes, UTF8));
+            }
+            in.close();
+            if (logger_.isDebugEnabled())
+                logger_.debug(String.format("completed reading (%d ms; %d 
keys) from saved cache at %s",
+                                            (System.currentTimeMillis() - 
start), keys.size(), path));
+        }
+
+        return keys;
+    }
+
+    // must be called after all sstables are loaded since row cache merges all 
row versions
+    public void initRowCache()
+    {
+        String msgSuffix = String.format(" row cache for %s of %s", 
columnFamily_, table_);
+        int rowCacheSavePeriodInSeconds = 
DatabaseDescriptor.getTableMetaData(table_).get(columnFamily_).rowCacheSavePeriodInSeconds;
+        int keyCacheSavePeriodInSeconds = 
DatabaseDescriptor.getTableMetaData(table_).get(columnFamily_).keyCacheSavePeriodInSeconds;
+
+        try
+        {
+            long start = System.currentTimeMillis();
+            logger_.info(String.format("loading%s", msgSuffix));
+            for (String key : 
readSavedCache(DatabaseDescriptor.getSerializedRowCachePath(table_, 
columnFamily_), true))
+                cacheRow(key);
+            logger_.info(String.format("completed loading (%d ms; %d keys) %s",
+                                       System.currentTimeMillis()-start, 
ssTables_.getRowCache().getSize(), msgSuffix));
+        }
+        catch (IOException ioe)
+        {
+            logger_.warn("error loading " + msgSuffix, ioe);
+        }
+
+        rowCacheWriteTask = new WrappedRunnable()
+        {
+            protected void runMayThrow() throws IOException
+            {
+                ssTables_.saveRowCache();
+            }
+        };
+        if (rowCacheSavePeriodInSeconds > 0)
+        {
+            cacheSavingExecutor.scheduleWithFixedDelay(rowCacheWriteTask,
+                                                       
rowCacheSavePeriodInSeconds,
+                                                       
rowCacheSavePeriodInSeconds,
+                                                       TimeUnit.SECONDS);
+        }
+
+        keyCacheWriteTask = new WrappedRunnable()
+        {
+            protected void runMayThrow() throws IOException
+            {
+                ssTables_.saveKeyCache();
+            }
+        };
+        if (keyCacheSavePeriodInSeconds > 0)
+        {
+            cacheSavingExecutor.scheduleWithFixedDelay(keyCacheWriteTask,
+                                                       
keyCacheSavePeriodInSeconds,
+                                                       
keyCacheSavePeriodInSeconds,
+                                                       TimeUnit.SECONDS);
+        }
+    }
+
+    public Future<?> submitKeyCacheWrite()
+    {
+        return cacheSavingExecutor.submit(keyCacheWriteTask);
+    }
+
+    public Future<?> submitRowCacheWrite()
+    {
+        return cacheSavingExecutor.submit(rowCacheWriteTask);
+    }
+
     public void addToCompactedRowStats(Long rowsize)
     {
         if (minRowCompactedSize < 1 || rowsize < minRowCompactedSize)
@@ -286,7 +399,7 @@ public class ColumnFamilyStore implement
     }
 
     private static String getColumnFamilyFromFileName(String filename)
-            {
+    {
         return filename.split("-")[0];
     }
 

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
 Thu Oct  7 00:09:37 2010
@@ -18,34 +18,33 @@
 
 package org.apache.cassandra.db;
 
-import java.io.IOException;
 import java.io.File;
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
-import javax.management.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 
-import org.apache.log4j.Logger;
+import org.apache.commons.collections.PredicateUtils;
+import org.apache.commons.collections.iterators.CollatingIterator;
+import org.apache.commons.collections.iterators.FilterIterator;
+import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.*;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.AntiEntropyService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.log4j.Logger;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
-import java.net.InetAddress;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.collections.iterators.FilterIterator;
-import org.apache.commons.collections.iterators.CollatingIterator;
-import org.apache.commons.collections.PredicateUtils;
-
 public class CompactionManager implements CompactionManagerMBean
 {
     public static final String MBEAN_OBJECT_NAME = 
"org.apache.cassandra.db:type=CompactionManager";
@@ -566,9 +565,7 @@ public class CompactionManager implement
 
         public CompactionExecutor()
         {
-            super("COMPACTION-POOL", 
System.getProperty("cassandra.compaction.priority") == null
-                                     ? Thread.NORM_PRIORITY
-                                     : 
Integer.parseInt(System.getProperty("cassandra.compaction.priority")));
+            super("COMPACTION-POOL", 
DatabaseDescriptor.getCompactionPriority());
         }
 
         @Override

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java 
(original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java 
Thu Oct  7 00:09:37 2010
@@ -18,30 +18,29 @@
 
 package org.apache.cassandra.db;
 
-import java.util.*;
-import java.io.IOException;
 import java.io.File;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
 import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.CommitLogSegment;
+import org.apache.cassandra.db.filter.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.SSTableDeletingReference;
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
-
-import java.net.InetAddress;
-
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.*;
-import org.apache.cassandra.db.filter.*;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.log4j.Logger;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public class Table 
 {
@@ -181,8 +180,6 @@ public class Table 
     private final Table.TableMetadata tableMetadata;
     /* ColumnFamilyStore per column family */
     private final Map<String, ColumnFamilyStore> columnFamilyStores = new 
HashMap<String, ColumnFamilyStore>();
-    // cache application CFs since Range queries ask for them a _lot_
-    private SortedSet<String> applicationColumnFamilies;
 
     public static Table open(String table) throws IOException
     {
@@ -198,12 +195,16 @@ public class Table 
                 {
                     tableInstance = new Table(table);
                     instances.put(table, tableInstance);
+
+                    //table has to be constructed and in the cache before 
cacheRow can be called
+                    for (ColumnFamilyStore cfs : 
tableInstance.getColumnFamilyStores())
+                        cfs.initRowCache();
                 }
             }
         }
         return tableInstance;
     }
-        
+
     public Set<String> getColumnFamilies()
     {
         return tableMetadata.getColumnFamilies();

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
 Thu Oct  7 00:09:37 2010
@@ -23,12 +23,7 @@ import java.lang.ref.Reference;
 import java.lang.ref.ReferenceQueue;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
+import java.util.*;
 
 import org.apache.cassandra.cache.InstrumentedCache;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -42,6 +37,7 @@ import org.apache.cassandra.service.Stor
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import org.apache.log4j.Logger;
 
 /**
  * SSTableReaders are open()ed by Table.onStart; after that they are created 
by SSTableWriter.renameAndOpen.
@@ -111,12 +107,23 @@ public class SSTableReader extends SSTab
     /** public, but only for tests */
     public static SSTableReader open(String dataFileName, IPartitioner 
partitioner) throws IOException
     {
+        return open(dataFileName, partitioner, Collections.<String>emptySet(), 
null);
+    }
+
+    public static SSTableReader open(String dataFileName, Collection<String> 
savedKeyCacheKeys, SSTableTracker tracker) throws IOException
+    {
+        return open(dataFileName, StorageService.getPartitioner(), 
savedKeyCacheKeys, tracker);
+    }
+
+    public static SSTableReader open(String dataFileName, IPartitioner 
partitioner, Collection<String> savedKeyCacheKeys, SSTableTracker tracker) 
throws IOException
+    {
         assert partitioner != null;
 
         long start = System.currentTimeMillis();
         SSTableReader sstable = new SSTableReader(dataFileName, partitioner);
-        logger.info("Sampling index for " + dataFileName);
-        sstable.loadIndexFile();
+        sstable.setTrackedBy(tracker);
+        logger.info("Sampling index and loading saved keyCache for " + 
dataFileName + " (" + savedKeyCacheKeys.size() + " saved keys)");
+        sstable.loadIndexAndCache(savedKeyCacheKeys);
         sstable.loadBloomFilter();
 
         if (logger.isDebugEnabled())
@@ -178,13 +185,16 @@ public class SSTableReader extends SSTab
         this.bf = bloomFilter;
     }
 
-    public void setTrackedBy(SSTableTracker tracker)
+    protected void setTrackedBy(SSTableTracker tracker)
     {
-        phantomReference = new SSTableDeletingReference(tracker, this, 
finalizerQueue);
-        finalizers.add(phantomReference);
-        // TODO keyCache should never be null in live Cassandra, but only 
setting it here
-        // means it can be during tests, so we have to do 
otherwise-unnecessary != null checks
-        keyCache = tracker.getKeyCache();
+        if (tracker != null)
+        {
+            phantomReference = new SSTableDeletingReference(tracker, this, 
finalizerQueue);
+            finalizers.add(phantomReference);
+            // TODO keyCache should never be null in live Cassandra, but only 
setting it here
+            // means it can be during tests, so we have to do 
otherwise-unnecessary != null checks
+            keyCache = tracker.getKeyCache();
+        }
     }
 
     private static MappedByteBuffer mmap(String filename, long start, int 
size) throws IOException
@@ -237,7 +247,7 @@ public class SSTableReader extends SSTab
         }
     }
 
-    void loadIndexFile() throws IOException
+    void loadIndexAndCache(Collection<String> keysToLoadInCache) throws 
IOException
     {
         // we read the positions in a BRAF so we don't have to worry about an 
entry spanning a mmap boundary.
         // any entries that do, we force into the in-memory sample so key 
lookup can always bsearch within
@@ -246,6 +256,9 @@ public class SSTableReader extends SSTab
         BufferedRandomAccessFile input = new 
BufferedRandomAccessFile(indexFilename(), "r");
         try
         {
+            if (keyCache != null && keyCache.getCapacity() - 
keyCache.getSize() < keysToLoadInCache.size())
+                keyCache.updateCapacity(keyCache.getSize() + 
keysToLoadInCache.size());
+
             long indexSize = input.length();
             // we need to know both the current index entry and its data 
position, as well as the
             // next such pair, in order to compute tne mmap-spanning entries.  
since seeking
@@ -270,8 +283,13 @@ public class SSTableReader extends SSTab
 
                 nextEntry = new IndexSummary.KeyPosition(key, indexPosition);
                 nextDataPos = dataPosition;
-                indexSummary.maybeAddEntry(thisEntry.key, thisDataPos, 
nextDataPos - thisDataPos, thisEntry.indexPosition, nextEntry.indexPosition);
-
+                SSTable.PositionSize posSize = new PositionSize(thisDataPos, 
nextDataPos - thisDataPos);
+                if (keyCache != null && 
keysToLoadInCache.contains(thisEntry.key.key))
+                    keyCache.put(new Pair<String, DecoratedKey>(path, 
thisEntry.key), posSize);
+
+                indexSummary.maybeAddEntry(thisEntry.key, posSize.position, 
posSize.size, thisEntry.indexPosition, nextEntry.indexPosition);
+                //indexSummary.maybeAddEntry(thisEntry.key, thisDataPos, 
nextDataPos - thisDataPos, thisEntry.indexPosition, nextEntry.indexPosition);
+               
                 thisEntry = nextEntry;
                 thisDataPos = nextDataPos;
             }

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java
 Thu Oct  7 00:09:37 2010
@@ -21,16 +21,18 @@ package org.apache.cassandra.io;
  */
 
 
+import java.io.*;
+import java.nio.charset.Charset;
 import java.util.*;
-import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.base.Function;
+
 import org.apache.cassandra.cache.JMXInstrumentedCache;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.utils.Pair;
-
 import org.apache.log4j.Logger;
 
 public class SSTableTracker implements Iterable<SSTableReader>
@@ -56,6 +58,61 @@ public class SSTableTracker implements I
         rowCache = new JMXInstrumentedCache<String, ColumnFamily>(ksname, 
cfname + "RowCache", 0);
     }
 
+    protected class CacheWriter<K, V>
+    {
+        public void saveCache(JMXInstrumentedCache<K, V> cache, File 
savedCachePath, Function<K, byte[]> converter) throws IOException
+        {
+            long start = System.currentTimeMillis();
+            String msgSuffix = " " + savedCachePath.getName() + " for " + 
cfname + " of " + ksname;
+            logger.debug("saving" + msgSuffix);
+            int count = 0;
+            File tmpFile = File.createTempFile(savedCachePath.getName(), null, 
savedCachePath.getParentFile());
+            FileOutputStream fout = new FileOutputStream(tmpFile);
+            ObjectOutputStream out = new ObjectOutputStream(new 
BufferedOutputStream(fout));
+            FileDescriptor fd = fout.getFD();
+            for (K key : cache.getKeySet())
+            {
+                byte[] bytes = converter.apply(key);
+                out.writeInt(bytes.length);
+                out.write(bytes);
+                ++count;
+            }
+            out.flush();
+            fd.sync();
+            out.close();
+            if (!tmpFile.renameTo(savedCachePath))
+                throw new IOException("Unable to rename cache to " + 
savedCachePath);
+            if (logger.isDebugEnabled())
+                logger.debug("saved " + count + " keys in " + 
(System.currentTimeMillis() - start) + " ms from" + msgSuffix);
+        }
+    }
+
+    public void saveKeyCache() throws IOException
+    {
+        Function<Pair<String, DecoratedKey>, byte[]> function = new 
Function<Pair<String, DecoratedKey>, byte[]>()
+        {
+            public byte[] apply(Pair<String, DecoratedKey> key)
+            {
+                return key.right.key.getBytes(Charset.forName("UTF-8"));
+            }
+        };
+        CacheWriter<Pair<String, DecoratedKey>, SSTable.PositionSize> writer = 
new CacheWriter<Pair<String, DecoratedKey>, SSTable.PositionSize>();
+        writer.saveCache(keyCache, 
DatabaseDescriptor.getSerializedKeyCachePath(ksname, cfname), function);
+    }
+
+    public void saveRowCache() throws IOException
+    {
+        Function<String, byte[]> function = new Function<String, byte[]>()
+        {
+            public byte[] apply(String key)
+            {
+                return key.getBytes(Charset.forName("UTF-8"));
+            }
+        };
+        CacheWriter<String, ColumnFamily> writer = new CacheWriter<String, 
ColumnFamily>();
+        writer.saveCache(rowCache, 
DatabaseDescriptor.getSerializedRowCachePath(ksname, cfname), function);
+    }
+
     public synchronized void replace(Collection<SSTableReader> oldSSTables, 
Iterable<SSTableReader> replacements) throws IOException
     {
         Set<SSTableReader> sstablesNew = new HashSet<SSTableReader>(sstables);

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
 Thu Oct  7 00:09:37 2010
@@ -30,12 +30,10 @@ import java.util.concurrent.TimeoutExcep
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.log4j.Logger;
+import com.google.common.collect.Multimap;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 
-import com.google.common.collect.Multimap;
-
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
@@ -54,6 +52,7 @@ import org.apache.cassandra.thrift.Unava
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.LatencyTracker;
 import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.log4j.Logger;
 
 
 public class StorageProxy implements StorageProxyMBean
@@ -80,7 +79,7 @@ public class StorageProxy implements Sto
         }
     }
 
-    private static final Comparator<String> keyComparator = new 
Comparator<String>()
+    public static final Comparator<String> keyComparator = new 
Comparator<String>()
     {
         public int compare(String o1, String o2)
         {

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
 Thu Oct  7 00:09:37 2010
@@ -18,46 +18,50 @@
 
 package org.apache.cassandra.service;
 
-import java.io.IOException;
 import java.io.IOError;
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
+import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.net.InetAddress;
-import javax.management.*;
+import java.util.concurrent.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
-import org.apache.cassandra.concurrent.*;
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.DeletionService;
 import org.apache.cassandra.io.IndexSummary;
-import org.apache.cassandra.locator.*;
-import org.apache.cassandra.net.*;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndPointSnitch;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ResponseVerbHandler;
 import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.cassandra.io.util.FileUtils;
-
-import org.apache.log4j.Logger;
 import org.apache.log4j.Level;
-import org.apache.commons.lang.StringUtils;
-
-import com.google.common.collect.Multimap;
-import com.google.common.collect.HashMultimap;
+import org.apache.log4j.Logger;
 
 /*
  * This abstraction contains the token/identifier of this node
@@ -1605,7 +1609,19 @@ public class StorageService implements I
        
         setMode("Node is drained", true);
     }
-    
+
+    public void saveCaches() throws ExecutionException, InterruptedException
+    {
+        List<Future<?>> futures = new ArrayList<Future<?>>();
+        logger_.debug("submitting cache saves");
+        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+        {
+            futures.add(cfs.submitKeyCacheWrite());
+            futures.add(cfs.submitRowCacheWrite());
+        }
+        FBUtilities.waitOnFutures(futures);
+        logger_.debug("cache saves completed");
+    }
 
     // Never ever do this at home. Used by tests.
     Map<String, AbstractReplicationStrategy> 
setReplicationStrategyUnsafe(Map<String, AbstractReplicationStrategy> 
replacement)

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageServiceMBean.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageServiceMBean.java
 Thu Oct  7 00:09:37 2010
@@ -19,15 +19,14 @@
 package org.apache.cassandra.service;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
 
 import org.apache.cassandra.dht.Range;
-import java.net.InetAddress;
 
 
 public interface StorageServiceMBean
@@ -182,4 +181,7 @@ public interface StorageServiceMBean
     
     /** force hint delivery to an endpoint **/
     public void deliverHints(String host) throws UnknownHostException;
+
+    /** save row and key caches */
+    public void saveCaches() throws ExecutionException, InterruptedException;
 }

Modified: cassandra/branches/cassandra-0.6/test/conf/storage-conf.xml
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/conf/storage-conf.xml?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/test/conf/storage-conf.xml (original)
+++ cassandra/branches/cassandra-0.6/test/conf/storage-conf.xml Thu Oct  7 
00:09:37 2010
@@ -28,6 +28,7 @@
    <StoragePort>7010</StoragePort>
    <ThriftPort>9170</ThriftPort>
    <ColumnIndexSizeInKB>4</ColumnIndexSizeInKB>
+   <SavedCachesDirectory>/var/lib/cassandra/saved_caches</SavedCachesDirectory>
    <CommitLogDirectory>build/test/cassandra/commitlog</CommitLogDirectory>
    <CommitLogRotationThresholdInMB>128</CommitLogRotationThresholdInMB>
    <DataFileDirectories>


Reply via email to