Author: jbellis
Date: Thu Oct 7 02:24:22 2010
New Revision: 1005317
URL: http://svn.apache.org/viewvc?rev=1005317&view=rev
Log:
merge from 0.6
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/interface/cassandra.genavro
cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/Converter.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/config/RawColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
cassandra/trunk/test/conf/cassandra.yaml
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Oct 7 02:24:22 2010
@@ -17,6 +17,7 @@ dev
(CASSANDRA-1566)
* allow keyspace creation with RF > N (CASSANDRA-1428)
* improve cli error handling (CASSANDRA-1580)
+ * add cache save/load ability (CASSANDRA-1417)
0.7-beta2
Modified: cassandra/trunk/conf/cassandra.yaml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Thu Oct 7 02:24:22 2010
@@ -55,6 +55,9 @@ data_file_directories:
# commit log
commitlog_directory: /var/lib/cassandra/commitlog
+# saved caches
+saved_caches_directory: /var/lib/cassandra/saved_caches
+
# Size to allow commitlog to grow to before creating a new segment
commitlog_rotation_threshold_in_mb: 128
@@ -334,7 +337,11 @@ index_interval: 128
# before a minor compaction is forced. decreasing this will cause
# minor compactions to start more frequently and be less intensive.
# setting this to 0 disables minor compactions. defaults to 32.
-#
+# - row_cache_save_period_in_seconds: number of seconds between saving row
caches.
+# The row caches can be saved periodically and if one exists on startup
it will be loaded.
+# - key_cache_save_period_in_seconds: number of seconds between saving key
caches.
+# The key caches can be saved periodically and if one exists on startup
it will be loaded.
+#
# NOTE: this keyspace definition is for demonstration purposes only.
# Cassandra will not load these definitions during startup. See
# http://wiki.apache.org/cassandra/FAQ#no_keyspaces for an explanation.
@@ -345,6 +352,10 @@ keyspaces:
column_families:
- name: Standard1
compare_with: BytesType
+ keys_cached: 10000
+ rows_cached: 1000
+ row_cache_save_period_in_seconds: 0
+ key_cache_save_period_in_seconds: 3600
- name: Standard2
compare_with: UTF8Type
Modified: cassandra/trunk/interface/cassandra.genavro
URL:
http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.genavro (original)
+++ cassandra/trunk/interface/cassandra.genavro Thu Oct 7 02:24:22 2010
@@ -152,6 +152,8 @@ protocol Cassandra {
union { null, int } min_compaction_threshold = null;
union { null, int } max_compaction_threshold = null;
union { int, null } id;
+ union { int, null } row_cache_save_period_in_seconds = 0;
+ union { int, null } key_cache_save_period_in_seconds = 3600;
union { array<ColumnDef>, null } column_metadata;
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java
Thu Oct 7 02:24:22 2010
@@ -21,6 +21,7 @@ package org.apache.cassandra.cache;
*/
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import
com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
@@ -123,4 +124,9 @@ public class InstrumentedCache<K, V>
requests.set(0);
hits.set(0);
}
+
+ public Set<K> getKeySet()
+ {
+ return map.keySet();
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Thu
Oct 7 02:24:22 2010
@@ -24,23 +24,20 @@ import java.util.concurrent.atomic.Atomi
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
-import org.apache.cassandra.avro.ColumnDef;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.avro.util.Utf8;
+import org.apache.cassandra.avro.ColumnDef;
import org.apache.cassandra.db.*;
-
-import org.apache.cassandra.db.marshal.TimeUUIDType;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.io.SerDeUtils;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.db.ClockType;
import org.apache.cassandra.db.clock.AbstractReconciler;
import org.apache.cassandra.db.clock.TimestampReconciler;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.migration.Migration;
+import org.apache.cassandra.io.SerDeUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -50,6 +47,8 @@ public final class CFMetaData
public final static double DEFAULT_ROW_CACHE_SIZE = 0.0;
public final static double DEFAULT_KEY_CACHE_SIZE = 200000;
public final static double DEFAULT_READ_REPAIR_CHANCE = 1.0;
+ public final static int DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS = 0;
+ public final static int DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS = 3600;
public final static boolean DEFAULT_PRELOAD_ROW_CACHE = false;
public final static int DEFAULT_GC_GRACE_SECONDS = 864000;
public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
@@ -86,7 +85,9 @@ public final class CFMetaData
DEFAULT_MIN_COMPACTION_THRESHOLD,
DEFAULT_MAX_COMPACTION_THRESHOLD,
cfId,
- Collections.<byte[],
ColumnDefinition>emptyMap());
+ Collections.<byte[], ColumnDefinition>emptyMap(),
+ DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS,
+ DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS);
}
/**
@@ -130,6 +131,8 @@ public final class CFMetaData
public final AbstractType defaultValidator; // default none, use
comparator types
public final Integer minCompactionThreshold; // default 4
public final Integer maxCompactionThreshold; // default 32
+ public final int rowCacheSavePeriodInSeconds; //default 0 (off)
+ public final int keyCacheSavePeriodInSeconds; //default 0 (off)
// NOTE: if you find yourself adding members to this class, make sure you
keep the convert methods in lockstep.
public final Map<byte[], ColumnDefinition> column_metadata;
@@ -151,7 +154,10 @@ public final class CFMetaData
int minCompactionThreshold,
int maxCompactionThreshold,
Integer cfId,
- Map<byte[], ColumnDefinition> column_metadata)
+ Map<byte[], ColumnDefinition> column_metadata,
+ int rowCacheSavePeriodInSeconds,
+ int keyCacheSavePeriodInSeconds)
+
{
assert column_metadata != null;
this.tableName = tableName;
@@ -174,6 +180,8 @@ public final class CFMetaData
this.maxCompactionThreshold = maxCompactionThreshold;
this.cfId = cfId;
this.column_metadata = Collections.unmodifiableMap(column_metadata);
+ this.rowCacheSavePeriodInSeconds = rowCacheSavePeriodInSeconds;
+ this.keyCacheSavePeriodInSeconds = keyCacheSavePeriodInSeconds;
}
/** adds this cfm to the map. */
@@ -224,7 +232,9 @@ public final class CFMetaData
minCompactionThreshold,
maxCompactionThreshold,
nextId(),
- column_metadata);
+ column_metadata,
+ DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS,
+ DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS);
}
public static CFMetaData newIndexMetadata(String table, String parentCf,
ColumnDefinition info, AbstractType columnComparator)
@@ -268,7 +278,9 @@ public final class CFMetaData
cfm.minCompactionThreshold,
cfm.maxCompactionThreshold,
cfm.cfId,
- cfm.column_metadata);
+ cfm.column_metadata,
+ cfm.rowCacheSavePeriodInSeconds,
+ cfm.keyCacheSavePeriodInSeconds);
}
/** clones existing CFMetaData. keeps the id but changes the table name.*/
@@ -291,7 +303,9 @@ public final class CFMetaData
cfm.minCompactionThreshold,
cfm.maxCompactionThreshold,
cfm.cfId,
- cfm.column_metadata);
+ cfm.column_metadata,
+ cfm.rowCacheSavePeriodInSeconds,
+ cfm.keyCacheSavePeriodInSeconds);
}
/** used for evicting cf data out of static tracking collections. */
@@ -361,8 +375,12 @@ public final class CFMetaData
column_metadata.put(cd.name, cd);
}
+ //isn't AVRO suppossed to handle stuff like this?
Integer minct = cf.min_compaction_threshold == null ?
DEFAULT_MIN_COMPACTION_THRESHOLD : cf.min_compaction_threshold;
Integer maxct = cf.max_compaction_threshold == null ?
DEFAULT_MAX_COMPACTION_THRESHOLD : cf.max_compaction_threshold;
+ Integer row_cache_save_period_in_seconds =
cf.row_cache_save_period_in_seconds == null ?
DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS : cf.row_cache_save_period_in_seconds;
+ Integer key_cache_save_period_in_seconds =
cf.key_cache_save_period_in_seconds == null ?
DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS : cf.key_cache_save_period_in_seconds;
+
return new CFMetaData(cf.keyspace.toString(),
cf.name.toString(),
ColumnFamilyType.create(cf.column_type.toString()),
@@ -380,7 +398,9 @@ public final class CFMetaData
minct,
maxct,
cf.id,
- column_metadata);
+ column_metadata,
+ row_cache_save_period_in_seconds,
+ key_cache_save_period_in_seconds);
}
public boolean equals(Object obj)
@@ -412,6 +432,8 @@ public final class CFMetaData
.append(maxCompactionThreshold, rhs.maxCompactionThreshold)
.append(cfId.intValue(), rhs.cfId.intValue())
.append(column_metadata, rhs.column_metadata)
+ .append(rowCacheSavePeriodInSeconds,
rhs.rowCacheSavePeriodInSeconds)
+ .append(keyCacheSavePeriodInSeconds,
rhs.keyCacheSavePeriodInSeconds)
.isEquals();
}
@@ -435,6 +457,8 @@ public final class CFMetaData
.append(maxCompactionThreshold)
.append(cfId)
.append(column_metadata)
+ .append(rowCacheSavePeriodInSeconds)
+ .append(keyCacheSavePeriodInSeconds)
.toHashCode();
}
@@ -493,7 +517,9 @@ public final class CFMetaData
cf_def.min_compaction_threshold,
cf_def.max_compaction_threshold,
cfId,
- column_metadata);
+ column_metadata,
+ cf_def.row_cache_save_period_in_seconds,
+ cf_def.key_cache_save_period_in_seconds);
}
// merges some final fields from this CFM with modifiable fields from
CfDef into a new CFMetaData.
@@ -538,7 +564,9 @@ public final class CFMetaData
cf_def.min_compaction_threshold,
cf_def.max_compaction_threshold,
cfId,
- column_metadata);
+ column_metadata,
+ rowCacheSavePeriodInSeconds,
+ keyCacheSavePeriodInSeconds);
}
// converts CFM to thrift CfDef
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Thu Oct 7
02:24:22 2010
@@ -85,7 +85,9 @@ public class Config
public Integer in_memory_compaction_limit_in_mb = 256;
public String[] data_file_directories;
-
+
+ public String saved_caches_directory;
+
// Commit Log
public String commitlog_directory;
public Integer commitlog_rotation_threshold_in_mb;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Converter.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Converter.java?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Converter.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Converter.java Thu Oct
7 02:24:22 2010
@@ -22,42 +22,36 @@ package org.apache.cassandra.config;
import java.io.BufferedWriter;
+import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
-import java.net.URL;
import java.util.ArrayList;
import java.util.List;
-
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.TransformerException;
import javax.xml.xpath.XPathExpressionException;
import org.apache.cassandra.auth.SimpleAuthenticator;
import org.apache.cassandra.auth.SimpleAuthority;
+import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SkipNullRepresenter;
import org.apache.cassandra.utils.XMLUtils;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.w3c.dom.NodeList;
import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import org.yaml.snakeyaml.Dumper;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.Yaml;
-import org.yaml.snakeyaml.introspector.Property;
-import org.yaml.snakeyaml.nodes.NodeTuple;
import org.yaml.snakeyaml.nodes.Tag;
-import org.yaml.snakeyaml.representer.Representer;
/**
* @deprecated Yaml configuration for Keyspaces and ColumnFamilies is
deprecated in 0.7
*/
public class Converter
{
-
private static Config conf = new Config();
- private final static String PREVIOUS_CONF_FILE = "cassandra.xml";
-
+
private static List<RawKeyspace> readTablesFromXml(XMLUtils xmlUtils)
throws ConfigurationException
{
@@ -125,7 +119,17 @@ public class Converter
{
ks.column_families[j].rows_cached =
FBUtilities.parseDoubleOrPercent(value);
}
-
+
+ if ((value = XMLUtils.getAttributeValue(columnFamily,
"RowCacheSavePeriodInSeconds")) != null)
+ {
+ ks.column_families[j].row_cache_save_period_in_seconds
= Integer.parseInt(value);
+ }
+
+ if ((value = XMLUtils.getAttributeValue(columnFamily,
"KeyCacheSavePeriodInSeconds")) != null)
+ {
+ ks.column_families[j].key_cache_save_period_in_seconds
= Integer.parseInt(value);
+ }
+
if ((value = XMLUtils.getAttributeValue(columnFamily,
"ReadRepairChance")) != null)
{
ks.column_families[j].read_repair_chance =
FBUtilities.parseDoubleOrPercent(value);
@@ -284,7 +288,9 @@ public class Converter
conf.data_file_directories =
xmlUtils.getNodeValues("/Storage/DataFileDirectories/DataFileDirectory");
conf.commitlog_directory =
xmlUtils.getNodeValue("/Storage/CommitLogDirectory");
-
+
+ conf.saved_caches_directory =
xmlUtils.getNodeValue("/Storage/SavedCachesDirectory");
+
String value =
xmlUtils.getNodeValue("/Storage/CommitLogRotationThresholdInMB");
if ( value != null)
conf.commitlog_rotation_threshold_in_mb =
Integer.parseInt(value);
@@ -329,39 +335,20 @@ public class Converter
out.close();
}
- public static void main (String[] args)
+ public static void main(String[] args) throws Exception
{
- try
- {
- String oldConfigName;
-
- ClassLoader loader = Converter.class.getClassLoader();
- URL scpurl = loader.getResource(PREVIOUS_CONF_FILE);
- if (scpurl == null)
- scpurl = loader.getResource("storage-conf.xml");
-
- if (scpurl != null)
- oldConfigName = scpurl.getFile();
- else
- throw new ConfigurationException("Error finding previous
configuration file.");
- System.out.println("Found previous configuration: " +
oldConfigName);
- loadPreviousConfig(oldConfigName);
-
- System.out.println("Creating new configuration cassandra.yaml");
- dumpConfig("cassandra.yaml");
- }
- catch (IOException e)
+ if (args.length != 2)
{
- System.out.println("Error creating new configuration file.");
- System.out.println(e.getMessage());
- e.printStackTrace();
- }
- catch (ConfigurationException e)
- {
- System.out.println("There was an error during config conversion.");
- System.out.println(e.getMessage());
- e.printStackTrace();
+ throw new IllegalArgumentException("usage: config-converter
oldfile newfile");
}
+ String oldConfigName = args[0];
+ String newConfigName = args[1];
+
+ if (!new File(oldConfigName).exists())
+ throw new IllegalArgumentException(String.format("%s does not
exist", oldConfigName));
+
+ loadPreviousConfig(oldConfigName);
+ dumpConfig(newConfigName);
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Thu Oct 7 02:24:22 2010
@@ -26,12 +26,13 @@ import java.net.URL;
import java.net.UnknownHostException;
import java.util.*;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.auth.*;
+import org.apache.cassandra.auth.AllowAllAuthenticator;
+import org.apache.cassandra.auth.AllowAllAuthority;
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.auth.IAuthority;
import org.apache.cassandra.config.Config.RequestSchedulerId;
import org.apache.cassandra.db.ClockType;
import org.apache.cassandra.db.ColumnFamilyType;
@@ -43,11 +44,9 @@ import org.apache.cassandra.db.commitlog
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.migration.Migration;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.EndpointSnitchInfo;
-import org.apache.cassandra.locator.LocalStrategy;
-import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.*;
import org.apache.cassandra.scheduler.IRequestScheduler;
import org.apache.cassandra.scheduler.NoScheduler;
import org.apache.cassandra.service.StorageService;
@@ -58,7 +57,7 @@ import org.yaml.snakeyaml.TypeDescriptio
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.error.YAMLException;
-public class DatabaseDescriptor
+public class DatabaseDescriptor
{
private static Logger logger =
LoggerFactory.getLogger(DatabaseDescriptor.class);
@@ -327,13 +326,18 @@ public class DatabaseDescriptor
}
/* data file and commit log directories. they get created later,
when they're needed. */
- if (conf.commitlog_directory != null && conf.data_file_directories
!= null)
+ if (conf.commitlog_directory != null && conf.data_file_directories
!= null && conf.saved_caches_directory != null)
{
for (String datadir : conf.data_file_directories)
{
if (datadir.equals(conf.commitlog_directory))
throw new ConfigurationException("commitlog_directory
must not be the same as any data_file_directories");
+ if (datadir.equals(conf.saved_caches_directory))
+ throw new
ConfigurationException("saved_caches_directory must not be the same as any
data_file_directories");
}
+
+ if
(conf.commitlog_directory.equals(conf.saved_caches_directory))
+ throw new ConfigurationException("saved_caches_directory
must not be the same as the commitlog_directory");
}
else
{
@@ -341,6 +345,8 @@ public class DatabaseDescriptor
throw new ConfigurationException("commitlog_directory
missing");
if (conf.data_file_directories == null)
throw new ConfigurationException("data_file_directories
missing; at least one data directory must be specified");
+ if (conf.saved_caches_directory == null)
+ throw new ConfigurationException("saved_caches_directory
missing");
}
/* threshold after which commit log should be rotated. */
@@ -544,7 +550,7 @@ public class DatabaseDescriptor
if (cf.read_repair_chance < 0.0 || cf.read_repair_chance > 1.0)
{
- throw new ConfigurationException("read_repair_chance must
be between 0.0 and 1.0");
+ throw new ConfigurationException("read_repair_chance must
be between 0.0 and 1.0 (0% and 100%)");
}
if (cf.min_compaction_threshold < 0 ||
cf.max_compaction_threshold < 0)
@@ -673,6 +679,11 @@ public class DatabaseDescriptor
throw new ConfigurationException("commitlog_directory must be
specified");
}
FileUtils.createDirectory(conf.commitlog_directory);
+ if (conf.saved_caches_directory == null)
+ {
+ throw new ConfigurationException("saved_caches_directory must
be specified");
+ }
+ FileUtils.createDirectory(conf.saved_caches_directory);
}
catch (ConfigurationException ex) {
logger.error("Fatal error: " + ex.getMessage());
@@ -907,6 +918,11 @@ public class DatabaseDescriptor
return conf.commitlog_directory;
}
+ public static String getSavedCachesLocation()
+ {
+ return conf.saved_caches_directory;
+ }
+
public static Set<InetAddress> getSeeds()
{
return seeds;
@@ -1116,4 +1132,14 @@ public class DatabaseDescriptor
{
return conf.index_interval;
}
+
+ public static File getSerializedRowCachePath(String ksName, String cfName)
+ {
+ return new File(conf.saved_caches_directory + File.separator + ksName
+ "-" + cfName + "-RowCache");
+ }
+
+ public static File getSerializedKeyCachePath(String ksName, String cfName)
+ {
+ return new File(conf.saved_caches_directory + File.separator + ksName
+ "-" + cfName + "-KeyCache");
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/RawColumnFamily.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/RawColumnFamily.java?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/RawColumnFamily.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/RawColumnFamily.java
Thu Oct 7 02:24:22 2010
@@ -21,10 +21,6 @@ package org.apache.cassandra.config;
*/
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.cassandra.db.ClockType;
import org.apache.cassandra.db.ColumnFamilyType;
/**
@@ -46,4 +42,6 @@ public class RawColumnFamily
public int min_compaction_threshold =
CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD;
public int max_compaction_threshold =
CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD;
public RawColumnDefinition[] column_metadata = new RawColumnDefinition[0];
+ public int row_cache_save_period_in_seconds =
CFMetaData.DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS;
+ public int key_cache_save_period_in_seconds =
CFMetaData.DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS;
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu
Oct 7 02:24:22 2010
@@ -18,16 +18,13 @@
package org.apache.cassandra.db;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FilenameFilter;
-import java.io.IOError;
-import java.io.IOException;
+import java.io.*;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -39,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
@@ -68,6 +66,9 @@ public class ColumnFamilyStore implement
{
private static Logger logger =
LoggerFactory.getLogger(ColumnFamilyStore.class);
+ private static final ScheduledThreadPoolExecutor cacheSavingExecutor =
+ new RetryingScheduledThreadPoolExecutor("CACHE-SAVER",
Thread.MIN_PRIORITY);
+
/*
* submitFlush first puts [Binary]Memtable.getSortedContents on the
flushSorter executor,
* which then puts the sorted results on the writer executor. This is
because sorting is CPU-bound,
@@ -133,6 +134,22 @@ public class ColumnFamilyStore implement
private int minCompactionThreshold;
private int maxCompactionThreshold;
+ private final Runnable rowCacheSaverTask = new WrappedRunnable()
+ {
+ protected void runMayThrow() throws IOException
+ {
+ ssTables.saveRowCache();
+ }
+ };
+
+ private final Runnable keyCacheSaverTask = new WrappedRunnable()
+ {
+ protected void runMayThrow() throws Exception
+ {
+ ssTables.saveKeyCache();
+ }
+ };
+
private ColumnFamilyStore(Table table, String columnFamilyName,
IPartitioner partitioner, int generation, CFMetaData metadata)
{
assert metadata != null : "null metadata for " + table + ":" +
columnFamilyName;
@@ -148,15 +165,18 @@ public class ColumnFamilyStore implement
if (logger.isDebugEnabled())
logger.debug("Starting CFS {}", columnFamily);
-
+
// scan for sstables corresponding to this cf and load them
+ ssTables = new SSTableTracker(table.name, columnFamilyName);
+ Set<DecoratedKey> savedKeys =
readSavedCache(DatabaseDescriptor.getSerializedKeyCachePath(table.name,
columnFamilyName));
+ logger.info("read " + savedKeys.size() + " from saved key cache");
List<SSTableReader> sstables = new ArrayList<SSTableReader>();
for (Map.Entry<Descriptor,Set<Component>> sstableFiles :
files(table.name, columnFamilyName, false).entrySet())
{
SSTableReader sstable;
try
{
- sstable = SSTableReader.open(sstableFiles.getKey(),
sstableFiles.getValue(), metadata, this.partitioner);
+ sstable = SSTableReader.open(sstableFiles.getKey(),
sstableFiles.getValue(), savedKeys, ssTables, metadata, this.partitioner);
}
catch (FileNotFoundException ex)
{
@@ -170,7 +190,6 @@ public class ColumnFamilyStore implement
}
sstables.add(sstable);
}
- ssTables = new SSTableTracker(table.name, columnFamilyName);
ssTables.add(sstables);
// create the private ColumnFamilyStores for the secondary column
indexes
@@ -196,6 +215,38 @@ public class ColumnFamilyStore implement
}
}
+ protected Set<DecoratedKey> readSavedCache(File path)
+ {
+ Set<DecoratedKey> keys = new TreeSet<DecoratedKey>();
+ try
+ {
+ long start = System.currentTimeMillis();
+
+ if (path.exists())
+ {
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("reading saved cache from %s",
path));
+ ObjectInputStream in = new ObjectInputStream(new
BufferedInputStream(new FileInputStream(path)));
+ while (in.available() > 0)
+ {
+ int size = in.readInt();
+ byte[] bytes = new byte[size];
+ in.readFully(bytes);
+
keys.add(StorageService.getPartitioner().decorateKey(bytes));
+ }
+ in.close();
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("completed reading (%d ms; %d
keys) from saved cache at %s",
+ System.currentTimeMillis() -
start, keys.size(), path));
+ }
+ }
+ catch (IOException ioe)
+ {
+ logger.warn(String.format("error reading saved cache at %s",
path.getAbsolutePath()), ioe);
+ }
+ return keys;
+ }
+
public void addIndex(final ColumnDefinition info)
{
assert info.index_type != null;
@@ -371,8 +422,63 @@ public class ColumnFamilyStore implement
}
}
}
+
+ // cleanup incomplete saved caches
+ Pattern tmpCacheFilePattern = Pattern.compile(table + "-" +
columnFamily + "-(Key|Row)Cache.*\\.tmp$");
+ File dir = new File(DatabaseDescriptor.getSavedCachesLocation());
+
+ if (dir.exists())
+ {
+ assert dir.isDirectory();
+ for (File file : dir.listFiles())
+ if (tmpCacheFilePattern.matcher(file.getName()).matches())
+ if (!file.delete())
+ logger.warn("could not delete " +
file.getAbsolutePath());
+ }
}
-
+
+ // must be called after all sstables are loaded since row cache merges all
row versions
+ public void initRowCache()
+ {
+ String msgSuffix = String.format(" row cache for %s of %s",
columnFamily, table.name);
+ int rowCacheSavePeriodInSeconds =
DatabaseDescriptor.getTableMetaData(table.name).get(columnFamily).rowCacheSavePeriodInSeconds;
+ int keyCacheSavePeriodInSeconds =
DatabaseDescriptor.getTableMetaData(table.name).get(columnFamily).keyCacheSavePeriodInSeconds;
+
+ long start = System.currentTimeMillis();
+ logger.info(String.format("loading%s", msgSuffix));
+ // sort the results on read because there are few reads and many
writes and reads only happen at startup
+ Set<DecoratedKey> savedKeys =
readSavedCache(DatabaseDescriptor.getSerializedRowCachePath(table.name,
columnFamily));
+ for (DecoratedKey key : savedKeys)
+ cacheRow(key);
+ logger.info(String.format("completed loading (%d ms; %d keys) %s",
+ System.currentTimeMillis()-start,
ssTables.getRowCache().getSize(), msgSuffix));
+ if (rowCacheSavePeriodInSeconds > 0)
+ {
+ cacheSavingExecutor.scheduleWithFixedDelay(rowCacheSaverTask,
+
rowCacheSavePeriodInSeconds,
+
rowCacheSavePeriodInSeconds,
+ TimeUnit.SECONDS);
+ }
+
+ if (keyCacheSavePeriodInSeconds > 0)
+ {
+ cacheSavingExecutor.scheduleWithFixedDelay(keyCacheSaverTask,
+
keyCacheSavePeriodInSeconds,
+
keyCacheSavePeriodInSeconds,
+ TimeUnit.SECONDS);
+ }
+ }
+
+ public Future<?> submitRowCacheWrite()
+ {
+ return cacheSavingExecutor.submit(rowCacheSaverTask);
+ }
+
+ public Future<?> submitKeyCacheWrite()
+ {
+ return cacheSavingExecutor.submit(keyCacheSaverTask);
+ }
+
/**
* Collects a map of sstable components.
*/
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Thu Oct 7 02:24:22 2010
@@ -29,7 +29,9 @@ import org.apache.commons.lang.ArrayUtil
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.base.Charsets.UTF_8;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.gms.FailureDetector;
@@ -37,15 +39,13 @@ import org.apache.cassandra.gms.Gossiper
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.DigestMismatchException;
+import org.apache.cassandra.service.IWriteResponseHandler;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.WriteResponseHandler;
-import org.apache.cassandra.service.IWriteResponseHandler;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
-import static com.google.common.base.Charsets.UTF_8;
-
/**
* For each endpoint for which we have hints, there is a row in the system
hints CF.
@@ -85,15 +85,7 @@ public class HintedHandOffManager
private final NonBlockingHashSet<InetAddress> queuedDeliveries = new
NonBlockingHashSet<InetAddress>();
- private final ExecutorService executor_;
-
- public HintedHandOffManager()
- {
- int hhPriority = System.getProperty("cassandra.compaction.priority")
== null
- ? Thread.NORM_PRIORITY
- :
Integer.parseInt(System.getProperty("cassandra.compaction.priority"));
- executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff",
hhPriority);
- }
+ private final ExecutorService executor_ = new
JMXEnabledThreadPoolExecutor("HintedHandoff",
DatabaseDescriptor.getCompactionThreadPriority());
private static boolean sendMessage(InetAddress endpoint, String tableName,
String cfName, byte[] key) throws IOException
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Oct 7
02:24:22 2010
@@ -101,12 +101,16 @@ public class Table
// open and store the table
tableInstance = new Table(table);
instances.put(table, tableInstance);
+
+ //table has to be constructed and in the cache before
cacheRow can be called
+ for (ColumnFamilyStore cfs :
tableInstance.getColumnFamilyStores())
+ cfs.initRowCache();
}
}
}
return tableInstance;
}
-
+
public static Table clear(String table) throws IOException
{
synchronized (Table.class)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
Thu Oct 7 02:24:22 2010
@@ -102,7 +102,7 @@ public class Descriptor
}
/**
- * @see #fromFilename(directory, name)
+ * @see #fromFilename(File directory, String name)
*/
public static Descriptor fromFilename(String filename)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
Thu Oct 7 02:24:22 2010
@@ -156,21 +156,27 @@ public class SSTableReader extends SSTab
public static SSTableReader open(Descriptor descriptor, Set<Component>
components, CFMetaData metadata, IPartitioner partitioner) throws IOException
{
+ return open(descriptor, components,
Collections.<DecoratedKey>emptySet(), null, metadata, partitioner);
+ }
+
+ public static SSTableReader open(Descriptor descriptor, Set<Component>
components, Set<DecoratedKey> savedKeys, SSTableTracker tracker, CFMetaData
metadata, IPartitioner partitioner) throws IOException
+ {
assert partitioner != null;
long start = System.currentTimeMillis();
logger.info("Sampling index for " + descriptor);
SSTableReader sstable = new SSTableReader(descriptor, components,
metadata, partitioner, null, null, null, null, System.currentTimeMillis(),
null, null);
+ sstable.setTrackedBy(tracker);
// versions before 'c' encoded keys as utf-16 before hashing to the
filter
if (descriptor.hasStringsInBloomFilter)
{
- sstable.load(true);
+ sstable.load(true, savedKeys);
}
else
{
- sstable.load(false);
+ sstable.load(false, savedKeys);
sstable.loadBloomFilter();
}
sstable.loadStatistics(descriptor);
@@ -178,6 +184,9 @@ public class SSTableReader extends SSTab
if (logger.isDebugEnabled())
logger.debug("INDEX LOAD TIME for " + descriptor + ": " +
(System.currentTimeMillis() - start) + " ms.");
+ if (logger.isDebugEnabled() && sstable.getKeyCache() != null)
+ logger.debug(String.format("key cache contains %s/%s keys",
sstable.getKeyCache().getSize(), sstable.getKeyCache().getCapacity()));
+
return sstable;
}
@@ -217,9 +226,12 @@ public class SSTableReader extends SSTab
public void setTrackedBy(SSTableTracker tracker)
{
- phantomReference = new SSTableDeletingReference(tracker, this,
finalizerQueue);
- finalizers.add(phantomReference);
- keyCache = tracker.getKeyCache();
+ if (tracker != null)
+ {
+ phantomReference = new SSTableDeletingReference(tracker, this,
finalizerQueue);
+ finalizers.add(phantomReference);
+ keyCache = tracker.getKeyCache();
+ }
}
void loadBloomFilter() throws IOException
@@ -238,7 +250,7 @@ public class SSTableReader extends SSTab
/**
* Loads ifile, dfile and indexSummary, and optionally recreates the bloom
filter.
*/
- private void load(boolean recreatebloom) throws IOException
+ private void load(boolean recreatebloom, Set<DecoratedKey>
keysToLoadInCache) throws IOException
{
SegmentedFile.Builder ibuilder =
SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
SegmentedFile.Builder dbuilder =
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
@@ -248,6 +260,9 @@ public class SSTableReader extends SSTab
BufferedRandomAccessFile input = new
BufferedRandomAccessFile(descriptor.filenameFor(Component.PRIMARY_INDEX), "r");
try
{
+ if (keyCache != null && keyCache.getCapacity() -
keyCache.getSize() < keysToLoadInCache.size())
+ keyCache.updateCapacity(keyCache.getSize() +
keysToLoadInCache.size());
+
long indexSize = input.length();
if (recreatebloom)
// estimate key count based on index length
@@ -266,6 +281,8 @@ public class SSTableReader extends SSTab
indexSummary.maybeAddEntry(decoratedKey, indexPosition);
ibuilder.addPotentialBoundary(indexPosition);
dbuilder.addPotentialBoundary(dataPosition);
+ if (keyCache != null &&
keysToLoadInCache.contains(decoratedKey))
+ keyCache.put(new Pair<Descriptor,
DecoratedKey>(descriptor, decoratedKey), dataPosition);
}
indexSummary.complete();
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
Thu Oct 7 02:24:22 2010
@@ -19,18 +19,20 @@
package org.apache.cassandra.io.sstable;
+import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.cache.JMXInstrumentedCache;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class SSTableTracker implements Iterable<SSTableReader>
{
private static final Logger logger =
LoggerFactory.getLogger(SSTableTracker.class);
@@ -51,7 +53,62 @@ public class SSTableTracker implements I
this.cfname = cfname;
sstables = Collections.emptySet();
keyCache = new
JMXInstrumentedCache<Pair<Descriptor,DecoratedKey>,Long>(ksname, cfname +
"KeyCache", 0);
- rowCache = new JMXInstrumentedCache<DecoratedKey,
ColumnFamily>(ksname, cfname + "RowCache", 0);
+ rowCache = new JMXInstrumentedCache<DecoratedKey,
ColumnFamily>(ksname, cfname + "RowCache", 3);
+ }
+
+ protected class CacheWriter<K, V>
+ {
+ public void saveCache(JMXInstrumentedCache<K, V> cache, File
savedCachePath, Function<K, byte[]> converter) throws IOException
+ {
+ long start = System.currentTimeMillis();
+ String msgSuffix = " " + savedCachePath.getName() + " for " +
cfname + " of " + ksname;
+ logger.debug("saving" + msgSuffix);
+ int count = 0;
+ File tmpFile = File.createTempFile(savedCachePath.getName(), null,
savedCachePath.getParentFile());
+ FileOutputStream fout = new FileOutputStream(tmpFile);
+ ObjectOutputStream out = new ObjectOutputStream(new
BufferedOutputStream(fout));
+ FileDescriptor fd = fout.getFD();
+ for (K key : cache.getKeySet())
+ {
+ byte[] bytes = converter.apply(key);
+ out.writeInt(bytes.length);
+ out.write(bytes);
+ ++count;
+ }
+ out.flush();
+ fd.sync();
+ out.close();
+ if (!tmpFile.renameTo(savedCachePath))
+ throw new IOException("Unable to rename cache to " +
savedCachePath);
+ if (logger.isDebugEnabled())
+ logger.debug("saved " + count + " keys in " +
(System.currentTimeMillis() - start) + " ms from" + msgSuffix);
+ }
+ }
+
+ public void saveKeyCache() throws IOException
+ {
+ Function<Pair<Descriptor, DecoratedKey>, byte[]> function = new
Function<Pair<Descriptor, DecoratedKey>, byte[]>()
+ {
+ public byte[] apply(Pair<Descriptor, DecoratedKey> key)
+ {
+ return key.right.key;
+ }
+ };
+ CacheWriter<Pair<Descriptor, DecoratedKey>, Long> writer = new
CacheWriter<Pair<Descriptor, DecoratedKey>, Long>();
+ writer.saveCache(keyCache,
DatabaseDescriptor.getSerializedKeyCachePath(ksname, cfname), function);
+ }
+
+ public void saveRowCache() throws IOException
+ {
+ Function<DecoratedKey, byte[]> function = new Function<DecoratedKey,
byte[]>()
+ {
+ public byte[] apply(DecoratedKey key)
+ {
+ return key.key;
+ }
+ };
+ CacheWriter<DecoratedKey, ColumnFamily> writer = new
CacheWriter<DecoratedKey, ColumnFamily>();
+ writer.saveCache(rowCache,
DatabaseDescriptor.getSerializedRowCachePath(ksname, cfname), function);
}
public synchronized void replace(Collection<SSTableReader> oldSSTables,
Iterable<SSTableReader> replacements)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Thu Oct 7 02:24:22 2010
@@ -25,13 +25,11 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;
-
import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-import org.apache.log4j.Level;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +60,7 @@ import org.apache.cassandra.thrift.Unava
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SkipNullRepresenter;
import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.log4j.Level;
import org.yaml.snakeyaml.Dumper;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.Yaml;
@@ -2051,4 +2050,18 @@ public class StorageService implements I
{
StorageProxy.truncateBlocking(keyspace, columnFamily);
}
+
+ public void saveCaches() throws ExecutionException, InterruptedException
+ {
+ List<Future<?>> futures = new ArrayList<Future<?>>();
+ logger_.debug("submitting cache saves");
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+ {
+ futures.add(cfs.submitKeyCacheWrite());
+ futures.add(cfs.submitRowCacheWrite());
+ }
+ FBUtilities.waitOnFutures(futures);
+ logger_.debug("cache saves completed");
+ }
+
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Thu Oct 7 02:24:22 2010
@@ -19,6 +19,7 @@
package org.apache.cassandra.service;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
@@ -31,8 +32,6 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.thrift.UnavailableException;
-import java.net.InetAddress;
-
public interface StorageServiceMBean
{
@@ -257,4 +256,7 @@ public interface StorageServiceMBean
/** force hint delivery to an endpoint **/
public void deliverHints(String host) throws UnknownHostException;
+
+ /** save row and key caches */
+ public void saveCaches() throws ExecutionException, InterruptedException;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Thu
Oct 7 02:24:22 2010
@@ -538,7 +538,7 @@ public class FBUtilities
return System.currentTimeMillis() * 1000;
}
- public static void waitOnFutures(Collection<Future<?>> futures)
+ public static void waitOnFutures(Iterable<Future<?>> futures)
{
for (Future f : futures)
{
Modified: cassandra/trunk/test/conf/cassandra.yaml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=1005317&r1=1005316&r2=1005317&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Thu Oct 7 02:24:22 2010
@@ -13,6 +13,7 @@ storage_port: 7010
rpc_port: 9170
column_index_size_in_kb: 4
commitlog_directory: build/test/cassandra/commitlog
+saved_caches_directory: /var/lib/cassandra/saved_caches
commitlog_rotation_threshold_in_mb: 128
data_file_directories:
- build/test/cassandra/data