Author: jbellis
Date: Thu Oct 7 00:09:37 2010
New Revision: 1005296
URL: http://svn.apache.org/viewvc?rev=1005296&view=rev
Log:
add cache save/load. patch by mdennis and jbellis for CASSANDRA-1417
Added:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/conf/storage-conf.xml
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/cache/InstrumentedCache.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageServiceMBean.java
cassandra/branches/cassandra-0.6/test/conf/storage-conf.xml
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Oct 7 00:09:37 2010
@@ -41,10 +41,12 @@ dev
* add memtable, cache information to GCInspector logs (CASSANDRA-1558)
* enable/disable HintedHandoff via JMX (CASSANDRA-1550)
* allow nodes to change IPs between restarts (CASSANDRA-1518)
+ * add save/load of row + key caches (CASSANDRA-1417)
* remove assertion causing rare (and harmless) error messages in
commitlog (CASSANDRA-1330)
* optimize SSTableReader.loadIndexFile to reduce re-reading of index
data during server startup (CASSANDRA-1526)
+ * add cache save/load ability (CASSANDRA-1417)
0.6.5
Modified: cassandra/branches/cassandra-0.6/conf/storage-conf.xml
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/conf/storage-conf.xml?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/conf/storage-conf.xml (original)
+++ cassandra/branches/cassandra-0.6/conf/storage-conf.xml Thu Oct 7 00:09:37
2010
@@ -110,11 +110,19 @@
~ ratios. Specify a fraction (value less than 1), a percentage (ending
in
~ a % sign) or an absolute number of rows to cache.
~ RowsCached defaults to 0, i.e., row cache is off by default.
+ ~
+ ~ Row and key caches may also be saved periodically; if so, the last-
+ ~ saved cache will be loaded in at server start. By default, cache
+ ~ saving is off.
~
~ Remember, when using caches as a percentage, they WILL grow with
~ your data set!
-->
- <ColumnFamily Name="Standard1" CompareWith="BytesType"/>
+ <ColumnFamily Name="Standard1" CompareWith="BytesType"
+ KeysCached="1000"
+ RowsCached="100"
+ RowCacheSavePeriodInSeconds="0"
+ KeyCacheSavePeriodInSeconds="3600"/>
<ColumnFamily Name="Standard2"
CompareWith="UTF8Type"
KeysCached="100%"/>
@@ -204,6 +212,7 @@
~ disk. Keep the data disks and the CommitLog disks separate for best
~ performance
-->
+ <SavedCachesDirectory>/var/lib/cassandra/saved_caches</SavedCachesDirectory>
<CommitLogDirectory>/var/lib/cassandra/commitlog</CommitLogDirectory>
<DataFileDirectories>
<DataFileDirectory>/var/lib/cassandra/data</DataFileDirectory>
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/cache/InstrumentedCache.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/cache/InstrumentedCache.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/cache/InstrumentedCache.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/cache/InstrumentedCache.java
Thu Oct 7 00:09:37 2010
@@ -21,6 +21,7 @@ package org.apache.cassandra.cache;
*/
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import
com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
@@ -123,4 +124,9 @@ public class InstrumentedCache<K, V>
requests.set(0);
hits.set(0);
}
+
+ public Set<K> getKeySet()
+ {
+ return map.keySet();
+ }
}
Added:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java?rev=1005296&view=auto
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
(added)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/RetryingScheduledThreadPoolExecutor.java
Thu Oct 7 00:09:37 2010
@@ -0,0 +1,93 @@
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+import org.apache.log4j.Logger;
+
+public class RetryingScheduledThreadPoolExecutor extends
ScheduledThreadPoolExecutor
+{
+ protected static Logger logger =
Logger.getLogger(RetryingScheduledThreadPoolExecutor.class);
+
+ public RetryingScheduledThreadPoolExecutor(String threadPoolName, int
priority)
+ {
+ this(1, threadPoolName, priority);
+ }
+
+ public RetryingScheduledThreadPoolExecutor(int corePoolSize, String
threadPoolName, int priority)
+ {
+ super(corePoolSize, new NamedThreadFactory(threadPoolName, priority));
+ }
+
+ public RetryingScheduledThreadPoolExecutor(String threadPoolName)
+ {
+ this(1, threadPoolName, Thread.NORM_PRIORITY);
+ }
+
+ @Override
+ protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable,
RunnableScheduledFuture<V> task)
+ {
+ return new LoggingScheduledFuture<V>(task);
+ }
+
+ private class LoggingScheduledFuture<V> implements
RunnableScheduledFuture<V>
+ {
+ private final RunnableScheduledFuture<V> task;
+
+ public LoggingScheduledFuture(RunnableScheduledFuture<V> task)
+ {
+ this.task = task;
+ }
+
+ public boolean isPeriodic()
+ {
+ return task.isPeriodic();
+ }
+
+ public long getDelay(TimeUnit unit)
+ {
+ return task.getDelay(unit);
+ }
+
+ public int compareTo(Delayed o)
+ {
+ return task.compareTo(o);
+ }
+
+ public void run()
+ {
+ try
+ {
+ task.run();
+ }
+ catch (Exception e)
+ {
+ logger.error("error running scheduled task", e);
+ }
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ return task.cancel(mayInterruptIfRunning);
+ }
+
+ public boolean isCancelled()
+ {
+ return task.isCancelled();
+ }
+
+ public boolean isDone()
+ {
+ return task.isDone();
+ }
+
+ public V get() throws InterruptedException, ExecutionException
+ {
+ return task.get();
+ }
+
+ public V get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException
+ {
+ return task.get(timeout, unit);
+ }
+ }
+}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/CFMetaData.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/CFMetaData.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/CFMetaData.java
Thu Oct 7 00:09:37 2010
@@ -21,12 +21,6 @@ package org.apache.cassandra.config;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.utils.FBUtilities;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
public final class CFMetaData
{
public final static double DEFAULT_KEY_CACHE_SIZE = 200000;
@@ -40,8 +34,11 @@ public final class CFMetaData
public final String comment; // for humans only
public final double rowCacheSize; // default 0
public final double keyCacheSize; // default 0.01
+ public final int rowCacheSavePeriodInSeconds; //default 0 (off)
+ public final int keyCacheSavePeriodInSeconds; //default 0 (off)
- CFMetaData(String tableName, String cfName, String columnType,
AbstractType comparator, AbstractType subcolumnComparator, String comment,
double rowCacheSize, double keyCacheSize)
+ CFMetaData(String tableName, String cfName, String columnType,
AbstractType comparator, AbstractType subcolumnComparator,
+ String comment, double rowCacheSize, double keyCacheSize, int
rowCacheSavePeriodInSeconds, int keyCacheSavePeriodInSeconds)
{
this.tableName = tableName;
this.cfName = cfName;
@@ -51,6 +48,8 @@ public final class CFMetaData
this.comment = comment;
this.rowCacheSize = rowCacheSize;
this.keyCacheSize = keyCacheSize;
+ this.rowCacheSavePeriodInSeconds = rowCacheSavePeriodInSeconds;
+ this.keyCacheSavePeriodInSeconds = keyCacheSavePeriodInSeconds;
}
// a quick and dirty pretty printer for describing the column family...
@@ -73,6 +72,8 @@ public final class CFMetaData
&& FBUtilities.equals(other.subcolumnComparator,
subcolumnComparator)
&& FBUtilities.equals(other.comment, comment)
&& other.rowCacheSize == rowCacheSize
- && other.keyCacheSize == keyCacheSize;
+ && other.keyCacheSize == keyCacheSize
+ && other.rowCacheSavePeriodInSeconds ==
rowCacheSavePeriodInSeconds
+ && other.keyCacheSavePeriodInSeconds ==
keyCacheSavePeriodInSeconds;
}
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Thu Oct 7 00:09:37 2010
@@ -18,18 +18,31 @@
package org.apache.cassandra.config;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.*;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.TransformerException;
+import javax.xml.xpath.XPathExpressionException;
+
import org.apache.cassandra.auth.AllowAllAuthenticator;
import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.HintedHandOffManager;
+import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.locator.IEndPointSnitch;
-import org.apache.cassandra.locator.DynamicEndpointSnitch;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
+import org.apache.cassandra.locator.IEndPointSnitch;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.XMLUtils;
import org.apache.log4j.Logger;
@@ -37,17 +50,6 @@ import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.TransformerException;
-import javax.xml.xpath.XPathExpressionException;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.*;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.net.URL;
-
public class DatabaseDescriptor
{
private static Logger logger = Logger.getLogger(DatabaseDescriptor.class);
@@ -82,6 +84,7 @@ public class DatabaseDescriptor
/* Current index into the above list of directories */
private static int currentIndex = 0;
private static String logFileDirectory;
+ private static String savedCachesDirectory;
private static int consistencyThreads = 4; // not configurable
private static int concurrentReaders = 8;
private static int concurrentWriters = 32;
@@ -142,6 +145,25 @@ public class DatabaseDescriptor
private final static String STORAGE_CONF_FILE = "storage-conf.xml";
+ public static final int DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS = 0;
+ public static final int DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS = 0;
+
+ public static File getSerializedRowCachePath(String ksName, String cfName)
+ {
+ return new File(savedCachesDirectory + File.separator + ksName + "-" +
cfName + "-RowCache");
+ }
+
+ public static File getSerializedKeyCachePath(String ksName, String cfName)
+ {
+ return new File(savedCachesDirectory + File.separator + ksName + "-" +
cfName + "-KeyCache");
+ }
+
+ public static int getCompactionPriority()
+ {
+ String priorityString =
System.getProperty("cassandra.compaction.priority");
+ return priorityString == null ? Thread.NORM_PRIORITY :
Integer.parseInt(priorityString);
+ }
+
/**
* Try the storage-config system property, and then inspect the classpath.
*/
@@ -470,6 +492,7 @@ public class DatabaseDescriptor
/* data file and commit log directories. they get created later,
when they're needed. */
dataFileDirectories =
xmlUtils.getNodeValues("/Storage/DataFileDirectories/DataFileDirectory");
logFileDirectory =
xmlUtils.getNodeValue("/Storage/CommitLogDirectory");
+ savedCachesDirectory =
xmlUtils.getNodeValue("/Storage/SavedCachesDirectory");
for (String datadir : dataFileDirectories)
{
@@ -518,7 +541,9 @@ public class DatabaseDescriptor
null,
"persistent metadata for the local node",
0.0,
-
0.01));
+
0.01,
+
DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS,
+
DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS));
systemMeta.cfMetaData.put(HintedHandOffManager.HINTS_CF, new
CFMetaData(Table.SYSTEM_TABLE,
HintedHandOffManager.HINTS_CF,
@@ -527,7 +552,9 @@ public class DatabaseDescriptor
new BytesType(),
"hinted handoff data",
0.0,
-
0.01));
+
0.01,
+
DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS,
+
DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS));
/* Load the seeds for node contact points */
String[] seedsxml = xmlUtils.getNodeValues("/Storage/Seeds/Seed");
@@ -732,7 +759,11 @@ public class DatabaseDescriptor
String comment = xmlUtils.getNodeValue(xqlCF + "Comment");
// insert it into the table dictionary.
- meta.cfMetaData.put(cfName, new CFMetaData(tableName,
cfName, columnType, comparator, subcolumnComparator, comment, rowCacheSize,
keyCacheSize));
+ String rowCacheSavePeriodString =
XMLUtils.getAttributeValue(columnFamily, "RowCacheSavePeriodInSeconds");
+ String keyCacheSavePeriodString =
XMLUtils.getAttributeValue(columnFamily, "KeyCacheSavePeriodInSeconds");
+ int rowCacheSavePeriod = keyCacheSavePeriodString != null
? Integer.valueOf(keyCacheSavePeriodString) :
DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS;
+ int keyCacheSavePeriod = rowCacheSavePeriodString != null
? Integer.valueOf(rowCacheSavePeriodString) :
DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS;
+ meta.cfMetaData.put(cfName, new CFMetaData(tableName,
cfName, columnType, comparator, subcolumnComparator, comment, rowCacheSize,
keyCacheSize, keyCacheSavePeriod, rowCacheSavePeriod));
}
tables.put(meta.name, meta);
@@ -806,6 +837,11 @@ public class DatabaseDescriptor
throw new ConfigurationException("CommitLogDirectory must be
specified");
}
FileUtils.createDirectory(logFileDirectory);
+ if (savedCachesDirectory == null)
+ {
+ throw new ConfigurationException("SavedCachesDirectory must be
specified");
+ }
+ FileUtils.createDirectory(savedCachesDirectory);
}
catch (ConfigurationException ex) {
logger.error("Fatal error: " + ex.getMessage());
@@ -864,7 +900,7 @@ public class DatabaseDescriptor
{
return partitioner;
}
-
+
public static IEndPointSnitch getEndPointSnitch(String table)
{
return tables.get(table).epSnitch;
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Thu Oct 7 00:09:37 2010
@@ -18,10 +18,9 @@
package org.apache.cassandra.db;
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
import java.lang.management.ManagementFactory;
+import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -35,11 +34,11 @@ import javax.management.ObjectName;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
-import org.apache.log4j.Logger;
import org.apache.commons.collections.IteratorUtils;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
@@ -48,15 +47,19 @@ import org.apache.cassandra.db.marshal.A
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.*;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.utils.*;
+import org.apache.log4j.Logger;
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
+ private static final ScheduledThreadPoolExecutor cacheSavingExecutor =
+ new RetryingScheduledThreadPoolExecutor("CACHE-SAVER",
Thread.MIN_PRIORITY);
+
private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
/*
@@ -119,7 +122,9 @@ public class ColumnFamilyStore implement
private long maxRowCompactedSize = 0L;
private long rowsCompactedTotalSize = 0L;
private long rowsCompactedCount = 0L;
-
+ private Runnable rowCacheWriteTask;
+ private Runnable keyCacheWriteTask;
+
ColumnFamilyStore(String table, String columnFamilyName, boolean isSuper,
int indexValue) throws IOException
{
table_ = table;
@@ -134,6 +139,7 @@ public class ColumnFamilyStore implement
// scan for data files corresponding to this CF
List<File> sstableFiles = new ArrayList<File>();
Pattern auxFilePattern =
Pattern.compile("(.*)(-Filter\\.db$|-Index\\.db$)");
+ Pattern tmpCacheFilePattern = Pattern.compile(table + "-" +
columnFamilyName + "-(Key|Row)Cache.*\\.tmp$");
for (File file : files())
{
String filename = file.getName();
@@ -157,6 +163,13 @@ public class ColumnFamilyStore implement
continue;
}
+ if (tmpCacheFilePattern.matcher(filename).matches())
+ {
+ logger_.info("removing incomplete saved cache " +
file.getAbsolutePath());
+ FileUtils.deleteWithConfirm(file);
+ continue;
+ }
+
if (filename.contains("-Data.db"))
{
sstableFiles.add(file.getAbsoluteFile());
@@ -165,6 +178,8 @@ public class ColumnFamilyStore implement
Collections.sort(sstableFiles, new FileUtils.FileComparator());
/* Load the index files and the Bloom Filters associated with them. */
+ ssTables_ = new SSTableTracker(table, columnFamilyName);
+ Set<String> savedKeys =
readSavedCache(DatabaseDescriptor.getSerializedKeyCachePath(table,
columnFamilyName), false);
List<SSTableReader> sstables = new ArrayList<SSTableReader>();
for (File file : sstableFiles)
{
@@ -175,7 +190,7 @@ public class ColumnFamilyStore implement
SSTableReader sstable;
try
{
- sstable = SSTableReader.open(filename);
+ sstable = SSTableReader.open(filename, savedKeys, ssTables_);
}
catch (IOException ex)
{
@@ -184,10 +199,108 @@ public class ColumnFamilyStore implement
}
sstables.add(sstable);
}
- ssTables_ = new SSTableTracker(table, columnFamilyName);
ssTables_.add(sstables);
}
+ protected Set<String> readSavedCache(File path, boolean sort) throws
IOException
+ {
+ Set<String> keys;
+ if (sort)
+ {
+ // sort the results on read because cache may be written many
times during server lifetime,
+ // so better to pay that price once on startup than sort at write
time.
+ keys = new TreeSet<String>(StorageProxy.keyComparator);
+ }
+ else
+ {
+ keys = new HashSet<String>();
+ }
+
+ long start = System.currentTimeMillis();
+ if (path.exists())
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("reading saved cache from " + path);
+ ObjectInputStream in = new ObjectInputStream(new
BufferedInputStream(new FileInputStream(path)));
+ Charset UTF8 = Charset.forName("UTF-8");
+ while (in.available() > 0)
+ {
+ int size = in.readInt();
+ byte[] bytes = new byte[size];
+ in.readFully(bytes);
+ keys.add(new String(bytes, UTF8));
+ }
+ in.close();
+ if (logger_.isDebugEnabled())
+ logger_.debug(String.format("completed reading (%d ms; %d
keys) from saved cache at %s",
+ (System.currentTimeMillis() -
start), keys.size(), path));
+ }
+
+ return keys;
+ }
+
+ // must be called after all sstables are loaded since row cache merges all
row versions
+ public void initRowCache()
+ {
+ String msgSuffix = String.format(" row cache for %s of %s",
columnFamily_, table_);
+ int rowCacheSavePeriodInSeconds =
DatabaseDescriptor.getTableMetaData(table_).get(columnFamily_).rowCacheSavePeriodInSeconds;
+ int keyCacheSavePeriodInSeconds =
DatabaseDescriptor.getTableMetaData(table_).get(columnFamily_).keyCacheSavePeriodInSeconds;
+
+ try
+ {
+ long start = System.currentTimeMillis();
+ logger_.info(String.format("loading%s", msgSuffix));
+ for (String key :
readSavedCache(DatabaseDescriptor.getSerializedRowCachePath(table_,
columnFamily_), true))
+ cacheRow(key);
+ logger_.info(String.format("completed loading (%d ms; %d keys) %s",
+ System.currentTimeMillis()-start,
ssTables_.getRowCache().getSize(), msgSuffix));
+ }
+ catch (IOException ioe)
+ {
+ logger_.warn("error loading " + msgSuffix, ioe);
+ }
+
+ rowCacheWriteTask = new WrappedRunnable()
+ {
+ protected void runMayThrow() throws IOException
+ {
+ ssTables_.saveRowCache();
+ }
+ };
+ if (rowCacheSavePeriodInSeconds > 0)
+ {
+ cacheSavingExecutor.scheduleWithFixedDelay(rowCacheWriteTask,
+
rowCacheSavePeriodInSeconds,
+
rowCacheSavePeriodInSeconds,
+ TimeUnit.SECONDS);
+ }
+
+ keyCacheWriteTask = new WrappedRunnable()
+ {
+ protected void runMayThrow() throws IOException
+ {
+ ssTables_.saveKeyCache();
+ }
+ };
+ if (keyCacheSavePeriodInSeconds > 0)
+ {
+ cacheSavingExecutor.scheduleWithFixedDelay(keyCacheWriteTask,
+
keyCacheSavePeriodInSeconds,
+
keyCacheSavePeriodInSeconds,
+ TimeUnit.SECONDS);
+ }
+ }
+
+ public Future<?> submitKeyCacheWrite()
+ {
+ return cacheSavingExecutor.submit(keyCacheWriteTask);
+ }
+
+ public Future<?> submitRowCacheWrite()
+ {
+ return cacheSavingExecutor.submit(rowCacheWriteTask);
+ }
+
public void addToCompactedRowStats(Long rowsize)
{
if (minRowCompactedSize < 1 || rowsize < minRowCompactedSize)
@@ -286,7 +399,7 @@ public class ColumnFamilyStore implement
}
private static String getColumnFamilyFromFileName(String filename)
- {
+ {
return filename.split("-")[0];
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
Thu Oct 7 00:09:37 2010
@@ -18,34 +18,33 @@
package org.apache.cassandra.db;
-import java.io.IOException;
import java.io.File;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
-import javax.management.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
-import org.apache.log4j.Logger;
+import org.apache.commons.collections.PredicateUtils;
+import org.apache.commons.collections.iterators.CollatingIterator;
+import org.apache.commons.collections.iterators.FilterIterator;
+import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.*;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.AntiEntropyService;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.log4j.Logger;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import java.net.InetAddress;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.collections.iterators.FilterIterator;
-import org.apache.commons.collections.iterators.CollatingIterator;
-import org.apache.commons.collections.PredicateUtils;
-
public class CompactionManager implements CompactionManagerMBean
{
public static final String MBEAN_OBJECT_NAME =
"org.apache.cassandra.db:type=CompactionManager";
@@ -566,9 +565,7 @@ public class CompactionManager implement
public CompactionExecutor()
{
- super("COMPACTION-POOL",
System.getProperty("cassandra.compaction.priority") == null
- ? Thread.NORM_PRIORITY
- :
Integer.parseInt(System.getProperty("cassandra.compaction.priority")));
+ super("COMPACTION-POOL",
DatabaseDescriptor.getCompactionPriority());
}
@Override
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
Thu Oct 7 00:09:37 2010
@@ -18,30 +18,29 @@
package org.apache.cassandra.db;
-import java.util.*;
-import java.io.IOException;
import java.io.File;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.CommitLogSegment;
+import org.apache.cassandra.db.filter.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.SSTableDeletingReference;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
-
-import java.net.InetAddress;
-
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.*;
-import org.apache.cassandra.db.filter.*;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.log4j.Logger;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
public class Table
{
@@ -181,8 +180,6 @@ public class Table
private final Table.TableMetadata tableMetadata;
/* ColumnFamilyStore per column family */
private final Map<String, ColumnFamilyStore> columnFamilyStores = new
HashMap<String, ColumnFamilyStore>();
- // cache application CFs since Range queries ask for them a _lot_
- private SortedSet<String> applicationColumnFamilies;
public static Table open(String table) throws IOException
{
@@ -198,12 +195,16 @@ public class Table
{
tableInstance = new Table(table);
instances.put(table, tableInstance);
+
+ //table has to be constructed and in the cache before
cacheRow can be called
+ for (ColumnFamilyStore cfs :
tableInstance.getColumnFamilyStores())
+ cfs.initRowCache();
}
}
}
return tableInstance;
}
-
+
public Set<String> getColumnFamilies()
{
return tableMetadata.getColumnFamilies();
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
Thu Oct 7 00:09:37 2010
@@ -23,12 +23,7 @@ import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
+import java.util.*;
import org.apache.cassandra.cache.InstrumentedCache;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -42,6 +37,7 @@ import org.apache.cassandra.service.Stor
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
+import org.apache.log4j.Logger;
/**
* SSTableReaders are open()ed by Table.onStart; after that they are created
by SSTableWriter.renameAndOpen.
@@ -111,12 +107,23 @@ public class SSTableReader extends SSTab
/** public, but only for tests */
public static SSTableReader open(String dataFileName, IPartitioner
partitioner) throws IOException
{
+ return open(dataFileName, partitioner, Collections.<String>emptySet(),
null);
+ }
+
+ public static SSTableReader open(String dataFileName, Collection<String>
savedKeyCacheKeys, SSTableTracker tracker) throws IOException
+ {
+ return open(dataFileName, StorageService.getPartitioner(),
savedKeyCacheKeys, tracker);
+ }
+
+ public static SSTableReader open(String dataFileName, IPartitioner
partitioner, Collection<String> savedKeyCacheKeys, SSTableTracker tracker)
throws IOException
+ {
assert partitioner != null;
long start = System.currentTimeMillis();
SSTableReader sstable = new SSTableReader(dataFileName, partitioner);
- logger.info("Sampling index for " + dataFileName);
- sstable.loadIndexFile();
+ sstable.setTrackedBy(tracker);
+ logger.info("Sampling index and loading saved keyCache for " +
dataFileName + " (" + savedKeyCacheKeys.size() + " saved keys)");
+ sstable.loadIndexAndCache(savedKeyCacheKeys);
sstable.loadBloomFilter();
if (logger.isDebugEnabled())
@@ -178,13 +185,16 @@ public class SSTableReader extends SSTab
this.bf = bloomFilter;
}
- public void setTrackedBy(SSTableTracker tracker)
+ protected void setTrackedBy(SSTableTracker tracker)
{
- phantomReference = new SSTableDeletingReference(tracker, this,
finalizerQueue);
- finalizers.add(phantomReference);
- // TODO keyCache should never be null in live Cassandra, but only
setting it here
- // means it can be during tests, so we have to do
otherwise-unnecessary != null checks
- keyCache = tracker.getKeyCache();
+ if (tracker != null)
+ {
+ phantomReference = new SSTableDeletingReference(tracker, this,
finalizerQueue);
+ finalizers.add(phantomReference);
+ // TODO keyCache should never be null in live Cassandra, but only
setting it here
+ // means it can be during tests, so we have to do
otherwise-unnecessary != null checks
+ keyCache = tracker.getKeyCache();
+ }
}
private static MappedByteBuffer mmap(String filename, long start, int
size) throws IOException
@@ -237,7 +247,7 @@ public class SSTableReader extends SSTab
}
}
- void loadIndexFile() throws IOException
+ void loadIndexAndCache(Collection<String> keysToLoadInCache) throws
IOException
{
// we read the positions in a BRAF so we don't have to worry about an
entry spanning a mmap boundary.
// any entries that do, we force into the in-memory sample so key
lookup can always bsearch within
@@ -246,6 +256,9 @@ public class SSTableReader extends SSTab
BufferedRandomAccessFile input = new
BufferedRandomAccessFile(indexFilename(), "r");
try
{
+ if (keyCache != null && keyCache.getCapacity() -
keyCache.getSize() < keysToLoadInCache.size())
+ keyCache.updateCapacity(keyCache.getSize() +
keysToLoadInCache.size());
+
long indexSize = input.length();
// we need to know both the current index entry and its data
position, as well as the
// next such pair, in order to compute tne mmap-spanning entries.
since seeking
@@ -270,8 +283,13 @@ public class SSTableReader extends SSTab
nextEntry = new IndexSummary.KeyPosition(key, indexPosition);
nextDataPos = dataPosition;
- indexSummary.maybeAddEntry(thisEntry.key, thisDataPos,
nextDataPos - thisDataPos, thisEntry.indexPosition, nextEntry.indexPosition);
-
+ SSTable.PositionSize posSize = new PositionSize(thisDataPos,
nextDataPos - thisDataPos);
+ if (keyCache != null &&
keysToLoadInCache.contains(thisEntry.key.key))
+ keyCache.put(new Pair<String, DecoratedKey>(path,
thisEntry.key), posSize);
+
+ indexSummary.maybeAddEntry(thisEntry.key, posSize.position,
posSize.size, thisEntry.indexPosition, nextEntry.indexPosition);
+ //indexSummary.maybeAddEntry(thisEntry.key, thisDataPos,
nextDataPos - thisDataPos, thisEntry.indexPosition, nextEntry.indexPosition);
+
thisEntry = nextEntry;
thisDataPos = nextDataPos;
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableTracker.java
Thu Oct 7 00:09:37 2010
@@ -21,16 +21,18 @@ package org.apache.cassandra.io;
*/
+import java.io.*;
+import java.nio.charset.Charset;
import java.util.*;
-import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Function;
+
import org.apache.cassandra.cache.JMXInstrumentedCache;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.utils.Pair;
-
import org.apache.log4j.Logger;
public class SSTableTracker implements Iterable<SSTableReader>
@@ -56,6 +58,61 @@ public class SSTableTracker implements I
rowCache = new JMXInstrumentedCache<String, ColumnFamily>(ksname,
cfname + "RowCache", 0);
}
+ protected class CacheWriter<K, V>
+ {
+ public void saveCache(JMXInstrumentedCache<K, V> cache, File
savedCachePath, Function<K, byte[]> converter) throws IOException
+ {
+ long start = System.currentTimeMillis();
+ String msgSuffix = " " + savedCachePath.getName() + " for " +
cfname + " of " + ksname;
+ logger.debug("saving" + msgSuffix);
+ int count = 0;
+ File tmpFile = File.createTempFile(savedCachePath.getName(), null,
savedCachePath.getParentFile());
+ FileOutputStream fout = new FileOutputStream(tmpFile);
+ ObjectOutputStream out = new ObjectOutputStream(new
BufferedOutputStream(fout));
+ FileDescriptor fd = fout.getFD();
+ for (K key : cache.getKeySet())
+ {
+ byte[] bytes = converter.apply(key);
+ out.writeInt(bytes.length);
+ out.write(bytes);
+ ++count;
+ }
+ out.flush();
+ fd.sync();
+ out.close();
+ if (!tmpFile.renameTo(savedCachePath))
+ throw new IOException("Unable to rename cache to " +
savedCachePath);
+ if (logger.isDebugEnabled())
+ logger.debug("saved " + count + " keys in " +
(System.currentTimeMillis() - start) + " ms from" + msgSuffix);
+ }
+ }
+
+ public void saveKeyCache() throws IOException
+ {
+ Function<Pair<String, DecoratedKey>, byte[]> function = new
Function<Pair<String, DecoratedKey>, byte[]>()
+ {
+ public byte[] apply(Pair<String, DecoratedKey> key)
+ {
+ return key.right.key.getBytes(Charset.forName("UTF-8"));
+ }
+ };
+ CacheWriter<Pair<String, DecoratedKey>, SSTable.PositionSize> writer =
new CacheWriter<Pair<String, DecoratedKey>, SSTable.PositionSize>();
+ writer.saveCache(keyCache,
DatabaseDescriptor.getSerializedKeyCachePath(ksname, cfname), function);
+ }
+
+ public void saveRowCache() throws IOException
+ {
+ Function<String, byte[]> function = new Function<String, byte[]>()
+ {
+ public byte[] apply(String key)
+ {
+ return key.getBytes(Charset.forName("UTF-8"));
+ }
+ };
+ CacheWriter<String, ColumnFamily> writer = new CacheWriter<String,
ColumnFamily>();
+ writer.saveCache(rowCache,
DatabaseDescriptor.getSerializedRowCachePath(ksname, cfname), function);
+ }
+
public synchronized void replace(Collection<SSTableReader> oldSSTables,
Iterable<SSTableReader> replacements) throws IOException
{
Set<SSTableReader> sstablesNew = new HashSet<SSTableReader>(sstables);
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Thu Oct 7 00:09:37 2010
@@ -30,12 +30,10 @@ import java.util.concurrent.TimeoutExcep
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.log4j.Logger;
+import com.google.common.collect.Multimap;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
-import com.google.common.collect.Multimap;
-
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
@@ -54,6 +52,7 @@ import org.apache.cassandra.thrift.Unava
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LatencyTracker;
import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.log4j.Logger;
public class StorageProxy implements StorageProxyMBean
@@ -80,7 +79,7 @@ public class StorageProxy implements Sto
}
}
- private static final Comparator<String> keyComparator = new
Comparator<String>()
+ public static final Comparator<String> keyComparator = new
Comparator<String>()
{
public int compare(String o1, String o2)
{
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
Thu Oct 7 00:09:37 2010
@@ -18,46 +18,50 @@
package org.apache.cassandra.service;
-import java.io.IOException;
import java.io.IOError;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
+import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.net.InetAddress;
-import javax.management.*;
+import java.util.concurrent.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
-import org.apache.cassandra.concurrent.*;
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.DeletionService;
import org.apache.cassandra.io.IndexSummary;
-import org.apache.cassandra.locator.*;
-import org.apache.cassandra.net.*;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndPointSnitch;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ResponseVerbHandler;
import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.cassandra.io.util.FileUtils;
-
-import org.apache.log4j.Logger;
import org.apache.log4j.Level;
-import org.apache.commons.lang.StringUtils;
-
-import com.google.common.collect.Multimap;
-import com.google.common.collect.HashMultimap;
+import org.apache.log4j.Logger;
/*
* This abstraction contains the token/identifier of this node
@@ -1605,7 +1609,19 @@ public class StorageService implements I
setMode("Node is drained", true);
}
-
+
+ public void saveCaches() throws ExecutionException, InterruptedException
+ {
+ List<Future<?>> futures = new ArrayList<Future<?>>();
+ logger_.debug("submitting cache saves");
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+ {
+ futures.add(cfs.submitKeyCacheWrite());
+ futures.add(cfs.submitRowCacheWrite());
+ }
+ FBUtilities.waitOnFutures(futures);
+ logger_.debug("cache saves completed");
+ }
// Never ever do this at home. Used by tests.
Map<String, AbstractReplicationStrategy>
setReplicationStrategyUnsafe(Map<String, AbstractReplicationStrategy>
replacement)
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageServiceMBean.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Thu Oct 7 00:09:37 2010
@@ -19,15 +19,14 @@
package org.apache.cassandra.service;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
import org.apache.cassandra.dht.Range;
-import java.net.InetAddress;
public interface StorageServiceMBean
@@ -182,4 +181,7 @@ public interface StorageServiceMBean
/** force hint delivery to an endpoint **/
public void deliverHints(String host) throws UnknownHostException;
+
+ /** save row and key caches */
+ public void saveCaches() throws ExecutionException, InterruptedException;
}
Modified: cassandra/branches/cassandra-0.6/test/conf/storage-conf.xml
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/conf/storage-conf.xml?rev=1005296&r1=1005295&r2=1005296&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/test/conf/storage-conf.xml (original)
+++ cassandra/branches/cassandra-0.6/test/conf/storage-conf.xml Thu Oct 7
00:09:37 2010
@@ -28,6 +28,7 @@
<StoragePort>7010</StoragePort>
<ThriftPort>9170</ThriftPort>
<ColumnIndexSizeInKB>4</ColumnIndexSizeInKB>
+ <SavedCachesDirectory>/var/lib/cassandra/saved_caches</SavedCachesDirectory>
<CommitLogDirectory>build/test/cassandra/commitlog</CommitLogDirectory>
<CommitLogRotationThresholdInMB>128</CommitLogRotationThresholdInMB>
<DataFileDirectories>