Author: brandonwilliams Date: Wed Mar 16 17:52:53 2011 New Revision: 1082236
URL: http://svn.apache.org/viewvc?rev=1082236&view=rev Log: Refactor row/key cache handling. Patch by slebresne, reviewed by Matthew Dennis for CASSANDRA-2272 Removed: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java cassandra/trunk/src/java/org/apache/cassandra/db/Table.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1082236&r1=1082235&r2=1082236&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Mar 16 17:52:53 2011 @@ -34,6 +34,7 @@ import org.apache.cassandra.auth.AllowAl import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.auth.IAuthority; import org.apache.cassandra.config.Config.RequestSchedulerId; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.db.DefsTable; import org.apache.cassandra.db.Table; @@ -997,26 +998,6 @@ public class DatabaseDescriptor return getCFMetaData(tableName, cfName).subcolumnComparator; } - /** - * @return The absolute number of keys that should be cached per table. - */ - public static int getKeysCachedFor(String tableName, String columnFamilyName, long expectedKeys) - { - CFMetaData cfm = getCFMetaData(tableName, columnFamilyName); - double v = (cfm == null) ? CFMetaData.DEFAULT_KEY_CACHE_SIZE : cfm.getKeyCacheSize(); - return (int)Math.min(FBUtilities.absoluteFromFraction(v, expectedKeys), Integer.MAX_VALUE); - } - - /** - * @return The absolute number of rows that should be cached for the columnfamily. - */ - public static int getRowsCachedFor(String tableName, String columnFamilyName, long expectedRows) - { - CFMetaData cfm = getCFMetaData(tableName, columnFamilyName); - double v = (cfm == null) ? CFMetaData.DEFAULT_ROW_CACHE_SIZE : cfm.getRowCacheSize(); - return (int)Math.min(FBUtilities.absoluteFromFraction(v, expectedRows), Integer.MAX_VALUE); - } - public static KSMetaData getTableDefinition(String table) { return tables.get(table); @@ -1156,14 +1137,9 @@ public class DatabaseDescriptor return conf.index_interval; } - public static File getSerializedRowCachePath(String ksName, String cfName) - { - return new File(conf.saved_caches_directory + File.separator + ksName + "-" + cfName + "-RowCache"); - } - - public static File getSerializedKeyCachePath(String ksName, String cfName) + public static File getSerializedCachePath(String ksName, String cfName, ColumnFamilyStore.CacheType cacheType) { - return new File(conf.saved_caches_directory + File.separator + ksName + "-" + cfName + "-KeyCache"); + return new File(conf.saved_caches_directory + File.separator + ksName + "-" + cfName + "-" + cacheType); } public static int getDynamicUpdateInterval() Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1082236&r1=1082235&r2=1082236&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Mar 16 17:52:53 2011 @@ -35,6 +35,9 @@ import org.apache.commons.lang.StringUti import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.cache.AutoSavingCache; +import org.apache.cassandra.cache.AutoSavingKeyCache; +import org.apache.cassandra.cache.AutoSavingRowCache; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.concurrent.StageManager; @@ -92,6 +95,7 @@ public class ColumnFamilyStore implement new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushQueueSize()), new NamedThreadFactory("FlushWriter"), "internal"); + public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor("MemtablePostFlusher"); private Set<Memtable> memtablesPendingFlush = new ConcurrentSkipListSet<Memtable>(); @@ -139,9 +143,27 @@ public class ColumnFamilyStore implement private volatile DefaultInteger rowCacheSaveInSeconds; private volatile DefaultInteger keyCacheSaveInSeconds; - // Locally held row/key cache scheduled tasks - private volatile ScheduledFuture<?> saveRowCacheTask; - private volatile ScheduledFuture<?> saveKeyCacheTask; + public static enum CacheType + { + KEY_CACHE_TYPE("KeyCache"), + ROW_CACHE_TYPE("RowCache"); + + public final String name; + + private CacheType(String name) + { + this.name = name; + } + + @Override + public String toString() + { + return name; + } + } + + public final AutoSavingCache<Pair<Descriptor,DecoratedKey>, Long> keyCache; + public final AutoSavingCache<DecoratedKey, ColumnFamily> rowCache; public void reload() { @@ -218,9 +240,12 @@ public class ColumnFamilyStore implement if (logger.isDebugEnabled()) logger.debug("Starting CFS {}", columnFamily); + keyCache = new AutoSavingKeyCache<Pair<Descriptor, DecoratedKey>, Long>(table.name, columnFamilyName, 0); + rowCache = new AutoSavingRowCache<DecoratedKey, ColumnFamily>(table.name, columnFamilyName, 0); + // scan for sstables corresponding to this cf and load them - ssTables = new SSTableTracker(table.name, columnFamilyName); - Set<DecoratedKey> savedKeys = readSavedCache(DatabaseDescriptor.getSerializedKeyCachePath(table.name, columnFamilyName)); + ssTables = new SSTableTracker(this); + Set<DecoratedKey> savedKeys = keyCache.readSaved(); List<SSTableReader> sstables = new ArrayList<SSTableReader>(); for (Map.Entry<Descriptor,Set<Component>> sstableFiles : files(table.name, columnFamilyName, false).entrySet()) { @@ -266,53 +291,6 @@ public class ColumnFamilyStore implement } } - protected Set<DecoratedKey> readSavedCache(File path) - { - Set<DecoratedKey> keys = new TreeSet<DecoratedKey>(); - if (path.exists()) - { - DataInputStream in = null; - try - { - long start = System.currentTimeMillis(); - - logger.info(String.format("reading saved cache %s", path)); - in = new DataInputStream(new BufferedInputStream(new FileInputStream(path))); - while (in.available() > 0) - { - int size = in.readInt(); - byte[] bytes = new byte[size]; - in.readFully(bytes); - ByteBuffer buffer = ByteBuffer.wrap(bytes); - DecoratedKey key; - try - { - key = StorageService.getPartitioner().decorateKey(buffer); - } - catch (Exception e) - { - logger.info(String.format("unable to read entry #%s from saved cache %s; skipping remaining entries", - keys.size(), path.getAbsolutePath()), e); - break; - } - keys.add(key); - } - if (logger.isDebugEnabled()) - logger.debug(String.format("completed reading (%d ms; %d keys) saved cache %s", - System.currentTimeMillis() - start, keys.size(), path)); - } - catch (IOException ioe) - { - logger.warn(String.format("error reading saved cache %s", path.getAbsolutePath()), ioe); - } - finally - { - FileUtils.closeQuietly(in); - } - } - return keys; - } - public boolean reverseReadWriteOrder() { //XXX: PURPOSE: allow less harmful race condition w/o locking @@ -575,76 +553,26 @@ public class ColumnFamilyStore implement } // must be called after all sstables are loaded since row cache merges all row versions - public void initRowCache() + public void initCaches() { - int rowCacheSavePeriodInSeconds = DatabaseDescriptor.getTableMetaData(table.name).get(columnFamily).getRowCacheSavePeriodInSeconds(); - int keyCacheSavePeriodInSeconds = DatabaseDescriptor.getTableMetaData(table.name).get(columnFamily).getKeyCacheSavePeriodInSeconds(); - long start = System.currentTimeMillis(); - // sort the results on read because there are few reads and many writes and reads only happen at startup - Set<DecoratedKey> savedKeys = readSavedCache(DatabaseDescriptor.getSerializedRowCachePath(table.name, columnFamily)); - for (DecoratedKey key : savedKeys) + // results are sorted on read (via treeset) because there are few reads and many writes and reads only happen at startup + for (DecoratedKey key : rowCache.readSaved()) cacheRow(key); - if (ssTables.getRowCache().getSize() > 0) + if (rowCache.getSize() > 0) logger.info(String.format("completed loading (%d ms; %d keys) row cache for %s.%s", System.currentTimeMillis()-start, - ssTables.getRowCache().getSize(), + rowCache.getSize(), table.name, columnFamily)); - scheduleCacheSaving(rowCacheSavePeriodInSeconds, keyCacheSavePeriodInSeconds); - } - - public void scheduleCacheSaving(int rowCacheSavePeriodInSeconds, int keyCacheSavePeriodInSeconds) - { - if (saveRowCacheTask != null) - { - saveRowCacheTask.cancel(false); // Do not interrupt an in-progress save - saveRowCacheTask = null; - } - if (rowCacheSavePeriodInSeconds > 0) - { - Runnable runnable = new WrappedRunnable() - { - public void runMayThrow() - { - submitRowCacheWrite(); - } - }; - saveRowCacheTask = StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, - rowCacheSavePeriodInSeconds, - rowCacheSavePeriodInSeconds, - TimeUnit.SECONDS); - } - - if (saveKeyCacheTask != null) - { - saveKeyCacheTask.cancel(false); // Do not interrupt an in-progress save - saveKeyCacheTask = null; - } - if (keyCacheSavePeriodInSeconds > 0) - { - Runnable runnable = new WrappedRunnable() - { - public void runMayThrow() - { - submitKeyCacheWrite(); - } - }; - saveKeyCacheTask = StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, - keyCacheSavePeriodInSeconds, - keyCacheSavePeriodInSeconds, - TimeUnit.SECONDS); - } - } - public Future<?> submitRowCacheWrite() - { - return CompactionManager.instance.submitCacheWrite(ssTables.getRowCacheWriter()); + scheduleCacheSaving(metadata.getRowCacheSavePeriodInSeconds(), metadata.getKeyCacheSavePeriodInSeconds()); } - public Future<?> submitKeyCacheWrite() + public void scheduleCacheSaving(int rowCacheSavePeriodInSeconds, int keyCacheSavePeriodInSeconds) { - return CompactionManager.instance.submitCacheWrite(ssTables.getKeyCacheWriter()); + keyCache.scheduleSaving(keyCacheSavePeriodInSeconds); + rowCache.scheduleSaving(rowCacheSavePeriodInSeconds); } /** @@ -1213,7 +1141,7 @@ public class ColumnFamilyStore implement private ColumnFamily cacheRow(DecoratedKey key) { ColumnFamily cached; - if ((cached = ssTables.getRowCache().get(key)) == null) + if ((cached = rowCache.get(key)) == null) { cached = getTopLevelColumns(QueryFilter.getIdentityFilter(key, new QueryPath(columnFamily)), Integer.MIN_VALUE); if (cached == null) @@ -1228,7 +1156,7 @@ public class ColumnFamilyStore implement } // avoid keeping a permanent reference to the original key buffer - ssTables.getRowCache().put(new DecoratedKey(key.token, ByteBufferUtil.clone(key.key)), cached); + rowCache.put(new DecoratedKey(key.token, ByteBufferUtil.clone(key.key)), cached); } return cached; } @@ -1240,7 +1168,7 @@ public class ColumnFamilyStore implement long start = System.nanoTime(); try { - if (ssTables.getRowCache().getCapacity() == 0) + if (rowCache.getCapacity() == 0) { ColumnFamily cf = getTopLevelColumns(filter, gcBefore); @@ -1799,12 +1727,12 @@ public class ColumnFamilyStore implement /** raw cached row -- does not fetch the row if it is not present. not counted in cache statistics. */ public ColumnFamily getRawCachedRow(DecoratedKey key) { - return ssTables.getRowCache().getCapacity() == 0 ? null : ssTables.getRowCache().getInternal(key); + return rowCache.getCapacity() == 0 ? null : rowCache.getInternal(key); } public void invalidateCachedRow(DecoratedKey key) { - ssTables.getRowCache().remove(key); + rowCache.remove(key); } public void forceMajorCompaction() throws InterruptedException, ExecutionException @@ -1814,32 +1742,32 @@ public class ColumnFamilyStore implement public void invalidateRowCache() { - ssTables.getRowCache().clear(); + rowCache.clear(); } public void invalidateKeyCache() { - ssTables.getKeyCache().clear(); + keyCache.clear(); } public int getRowCacheCapacity() { - return ssTables.getRowCache().getCapacity(); + return rowCache.getCapacity(); } public int getKeyCacheCapacity() { - return ssTables.getKeyCache().getCapacity(); + return keyCache.getCapacity(); } public int getRowCacheSize() { - return ssTables.getRowCache().getSize(); + return rowCache.getSize(); } public int getKeyCacheSize() { - return ssTables.getKeyCache().getSize(); + return keyCache.getSize(); } public static Iterable<ColumnFamilyStore> all() @@ -2216,21 +2144,8 @@ public class ColumnFamilyStore implement */ public void reduceCacheSizes() { - if (ssTables.getRowCache().getCapacity() > 0) - { - int newCapacity = (int) (DatabaseDescriptor.getReduceCacheCapacityTo() * ssTables.getRowCache().getSize()); - logger.warn(String.format("Reducing %s row cache capacity from %d to %s to reduce memory pressure", - columnFamily, ssTables.getRowCache().getCapacity(), newCapacity)); - ssTables.getRowCache().setCapacity(newCapacity); - } - - if (ssTables.getKeyCache().getCapacity() > 0) - { - int newCapacity = (int) (DatabaseDescriptor.getReduceCacheCapacityTo() * ssTables.getKeyCache().getSize()); - logger.warn(String.format("Reducing %s key cache capacity from %d to %s to reduce memory pressure", - columnFamily, ssTables.getKeyCache().getCapacity(), newCapacity)); - ssTables.getKeyCache().setCapacity(newCapacity); - } + rowCache.reduceCacheSize(); + keyCache.reduceCacheSize(); } private ByteBuffer intern(ByteBuffer name) @@ -2276,4 +2191,5 @@ public class ColumnFamilyStore implement { return Iterables.concat(indexedColumns.values(), Collections.singleton(this)); } + } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1082236&r1=1082235&r2=1082236&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed Mar 16 17:52:53 2011 @@ -39,6 +39,7 @@ import org.apache.commons.lang.StringUti import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Range; @@ -50,9 +51,9 @@ import org.apache.cassandra.service.Anti import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.OperationType; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.NodeId; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; -import org.apache.cassandra.utils.NodeId; import org.cliffc.high_scale_lib.NonBlockingHashMap; public class CompactionManager implements CompactionManagerMBean @@ -958,7 +959,7 @@ public class CompactionManager implement return executor.submit(callable); } - public Future<?> submitCacheWrite(final CacheWriter writer) + public Future<?> submitCacheWrite(final AutoSavingCache.Writer writer) { Runnable runnable = new WrappedRunnable() { Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1082236&r1=1082235&r2=1082236&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Mar 16 17:52:53 2011 @@ -114,7 +114,7 @@ public class Table //table has to be constructed and in the cache before cacheRow can be called for (ColumnFamilyStore cfs : tableInstance.getColumnFamilyStores()) - cfs.initRowCache(); + cfs.initCaches(); } } } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1082236&r1=1082235&r2=1082236&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Wed Mar 16 17:52:53 2011 @@ -360,14 +360,6 @@ public class SSTableReader extends SSTab } /** - * @return The key cache: for monitoring purposes. - */ - public InstrumentedCache getKeyCache() - { - return keyCache; - } - - /** * @return An estimate of the number of keys in this SSTable. */ public long estimatedKeys() @@ -678,4 +670,9 @@ public class SSTableReader extends SSTab { return bloomFilterTracker.getRecentTruePositiveCount(); } + + public InstrumentedCache<Pair<Descriptor,DecoratedKey>, Long> getKeyCache() + { + return keyCache; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java?rev=1082236&r1=1082235&r2=1082236&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java Wed Mar 16 17:52:53 2011 @@ -19,18 +19,15 @@ package org.apache.cassandra.io.sstable; -import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.base.Function; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.cache.JMXInstrumentedCache; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.utils.Pair; @@ -42,43 +39,12 @@ public class SSTableTracker implements I private final AtomicLong liveSize = new AtomicLong(); private final AtomicLong totalSize = new AtomicLong(); - private final String ksname; - private final String cfname; - - private final JMXInstrumentedCache<Pair<Descriptor,DecoratedKey>,Long> keyCache; - private final JMXInstrumentedCache<DecoratedKey, ColumnFamily> rowCache; + private final ColumnFamilyStore cfs; - public SSTableTracker(String ksname, String cfname) + public SSTableTracker(ColumnFamilyStore cfs) { - this.ksname = ksname; - this.cfname = cfname; + this.cfs = cfs; sstables = Collections.emptySet(); - keyCache = new JMXInstrumentedCache<Pair<Descriptor,DecoratedKey>,Long>(ksname, cfname + "KeyCache", 0); - rowCache = new JMXInstrumentedCache<DecoratedKey, ColumnFamily>(ksname, cfname + "RowCache", 3); - } - - public CacheWriter<Pair<Descriptor, DecoratedKey>, Long> getKeyCacheWriter() - { - Function<Pair<Descriptor, DecoratedKey>, ByteBuffer> function = new Function<Pair<Descriptor, DecoratedKey>, ByteBuffer>() - { - public ByteBuffer apply(Pair<Descriptor, DecoratedKey> key) - { - return key.right.key; - } - }; - return new CacheWriter<Pair<Descriptor, DecoratedKey>, Long>(cfname, keyCache, DatabaseDescriptor.getSerializedKeyCachePath(ksname, cfname), function); - } - - public CacheWriter<DecoratedKey, ColumnFamily> getRowCacheWriter() - { - Function<DecoratedKey, ByteBuffer> function = new Function<DecoratedKey, ByteBuffer>() - { - public ByteBuffer apply(DecoratedKey key) - { - return key.key; - } - }; - return new CacheWriter<DecoratedKey, ColumnFamily>(cfname, rowCache, DatabaseDescriptor.getSerializedRowCachePath(ksname, cfname), function); } public synchronized void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements) @@ -90,7 +56,7 @@ public class SSTableTracker implements I assert sstable.getKeySamples() != null; if (logger.isDebugEnabled()) logger.debug(String.format("adding %s to list of files tracked for %s.%s", - sstable.descriptor, ksname, cfname)); + sstable.descriptor, cfs.table.name, cfs.getColumnFamilyName())); sstablesNew.add(sstable); long size = sstable.bytesOnDisk(); liveSize.addAndGet(size); @@ -103,7 +69,7 @@ public class SSTableTracker implements I { if (logger.isDebugEnabled()) logger.debug(String.format("removing %s from list of files tracked for %s.%s", - sstable.descriptor, ksname, cfname)); + sstable.descriptor, cfs.table.name, cfs.getColumnFamilyName())); boolean removed = sstablesNew.remove(sstable); assert removed; sstable.markCompacted(); @@ -132,29 +98,8 @@ public class SSTableTracker implements I public synchronized void updateCacheSizes() { long keys = estimatedKeys(); - - if (!keyCache.isCapacitySetManually()) - { - int keyCacheSize = DatabaseDescriptor.getKeysCachedFor(ksname, cfname, keys); - if (keyCacheSize != keyCache.getCapacity()) - { - // update cache size for the new key volume - if (logger.isDebugEnabled()) - logger.debug("key cache capacity for " + cfname + " is " + keyCacheSize); - keyCache.updateCapacity(keyCacheSize); - } - } - - if (!rowCache.isCapacitySetManually()) - { - int rowCacheSize = DatabaseDescriptor.getRowsCachedFor(ksname, cfname, keys); - if (rowCacheSize != rowCache.getCapacity()) - { - if (logger.isDebugEnabled()) - logger.debug("row cache capacity for " + cfname + " is " + rowCacheSize); - rowCache.updateCapacity(rowCacheSize); - } - } + cfs.keyCache.updateCacheSize(keys); + cfs.rowCache.updateCacheSize(keys); } // the modifiers create new, unmodifiable objects each time; the volatile fences the assignment @@ -179,9 +124,14 @@ public class SSTableTracker implements I sstables = Collections.emptySet(); } + public JMXInstrumentedCache<Pair<Descriptor, DecoratedKey>, Long> getKeyCache() + { + return cfs.keyCache; + } + public JMXInstrumentedCache<DecoratedKey, ColumnFamily> getRowCache() { - return rowCache; + return cfs.rowCache; } public long estimatedKeys() @@ -208,10 +158,5 @@ public class SSTableTracker implements I { totalSize.addAndGet(-size); } - - public JMXInstrumentedCache<Pair<Descriptor, DecoratedKey>, Long> getKeyCache() - { - return keyCache; - } } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1082236&r1=1082235&r2=1082236&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Mar 16 17:52:53 2011 @@ -34,18 +34,16 @@ import com.google.common.collect.ArrayLi import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; - -import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.locator.*; -import org.apache.cassandra.utils.*; -import org.apache.log4j.Level; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.*; +import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.*; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.migration.AddKeyspace; import org.apache.cassandra.db.migration.Migration; import org.apache.cassandra.dht.BootStrapper; @@ -55,6 +53,10 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; import org.apache.cassandra.io.DeletionService; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.DynamicEndpointSnitch; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.IAsyncResult; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; @@ -63,6 +65,8 @@ import org.apache.cassandra.service.Anti import org.apache.cassandra.streaming.*; import org.apache.cassandra.thrift.Constants; import org.apache.cassandra.thrift.UnavailableException; +import org.apache.cassandra.utils.*; +import org.apache.log4j.Level; import org.yaml.snakeyaml.Dumper; import org.yaml.snakeyaml.DumperOptions; import org.yaml.snakeyaml.Yaml; @@ -2273,8 +2277,8 @@ public class StorageService implements I logger_.debug("submitting cache saves"); for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { - futures.add(cfs.submitKeyCacheWrite()); - futures.add(cfs.submitRowCacheWrite()); + futures.add(cfs.keyCache.submitWrite()); + futures.add(cfs.rowCache.submitWrite()); } FBUtilities.waitOnFutures(futures); logger_.debug("cache saves completed");
