Author: brandonwilliams
Date: Wed Mar 16 17:52:53 2011
New Revision: 1082236

URL: http://svn.apache.org/viewvc?rev=1082236&view=rev
Log:
Refactor row/key cache handling.
Patch by slebresne, reviewed by Matthew Dennis for CASSANDRA-2272

Removed:
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1082236&r1=1082235&r2=1082236&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
Wed Mar 16 17:52:53 2011
@@ -34,6 +34,7 @@ import org.apache.cassandra.auth.AllowAl
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.auth.IAuthority;
 import org.apache.cassandra.config.Config.RequestSchedulerId;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.db.DefsTable;
 import org.apache.cassandra.db.Table;
@@ -997,26 +998,6 @@ public class DatabaseDescriptor
         return getCFMetaData(tableName, cfName).subcolumnComparator;
     }
 
-    /**
-     * @return The absolute number of keys that should be cached per table.
-     */
-    public static int getKeysCachedFor(String tableName, String 
columnFamilyName, long expectedKeys)
-    {
-        CFMetaData cfm = getCFMetaData(tableName, columnFamilyName);
-        double v = (cfm == null) ? CFMetaData.DEFAULT_KEY_CACHE_SIZE : 
cfm.getKeyCacheSize();
-        return (int)Math.min(FBUtilities.absoluteFromFraction(v, 
expectedKeys), Integer.MAX_VALUE);
-    }
-
-    /**
-     * @return The absolute number of rows that should be cached for the 
columnfamily.
-     */
-    public static int getRowsCachedFor(String tableName, String 
columnFamilyName, long expectedRows)
-    {
-        CFMetaData cfm = getCFMetaData(tableName, columnFamilyName);
-        double v = (cfm == null) ? CFMetaData.DEFAULT_ROW_CACHE_SIZE : 
cfm.getRowCacheSize();
-        return (int)Math.min(FBUtilities.absoluteFromFraction(v, 
expectedRows), Integer.MAX_VALUE);
-    }
-
     public static KSMetaData getTableDefinition(String table)
     {
         return tables.get(table);
@@ -1156,14 +1137,9 @@ public class DatabaseDescriptor
         return conf.index_interval;
     }
 
-    public static File getSerializedRowCachePath(String ksName, String cfName)
-    {
-        return new File(conf.saved_caches_directory + File.separator + ksName 
+ "-" + cfName + "-RowCache");
-    }
-
-    public static File getSerializedKeyCachePath(String ksName, String cfName)
+    public static File getSerializedCachePath(String ksName, String cfName, 
ColumnFamilyStore.CacheType cacheType)
     {
-        return new File(conf.saved_caches_directory + File.separator + ksName 
+ "-" + cfName + "-KeyCache");
+        return new File(conf.saved_caches_directory + File.separator + ksName 
+ "-" + cfName + "-" + cacheType);
     }
 
     public static int getDynamicUpdateInterval()

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1082236&r1=1082235&r2=1082236&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed 
Mar 16 17:52:53 2011
@@ -35,6 +35,9 @@ import org.apache.commons.lang.StringUti
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cache.AutoSavingCache;
+import org.apache.cassandra.cache.AutoSavingKeyCache;
+import org.apache.cassandra.cache.AutoSavingRowCache;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
@@ -92,6 +95,7 @@ public class ColumnFamilyStore implement
                                                new 
LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushQueueSize()),
                                                new 
NamedThreadFactory("FlushWriter"),
                                                "internal");
+
     public static final ExecutorService postFlushExecutor = new 
JMXEnabledThreadPoolExecutor("MemtablePostFlusher");
     
     private Set<Memtable> memtablesPendingFlush = new 
ConcurrentSkipListSet<Memtable>();
@@ -139,9 +143,27 @@ public class ColumnFamilyStore implement
     private volatile DefaultInteger rowCacheSaveInSeconds;
     private volatile DefaultInteger keyCacheSaveInSeconds;
 
-    // Locally held row/key cache scheduled tasks
-    private volatile ScheduledFuture<?> saveRowCacheTask;
-    private volatile ScheduledFuture<?> saveKeyCacheTask;
+    public static enum CacheType
+    {
+        KEY_CACHE_TYPE("KeyCache"),
+        ROW_CACHE_TYPE("RowCache");
+
+        public final String name;
+
+        private CacheType(String name)
+        {
+            this.name = name;
+        }
+
+        @Override
+        public String toString()
+        {
+            return name;
+        }
+    }
+
+    public final AutoSavingCache<Pair<Descriptor,DecoratedKey>, Long> keyCache;
+    public final AutoSavingCache<DecoratedKey, ColumnFamily> rowCache;
 
     public void reload()
     {
@@ -218,9 +240,12 @@ public class ColumnFamilyStore implement
         if (logger.isDebugEnabled())
             logger.debug("Starting CFS {}", columnFamily);
 
+        keyCache = new AutoSavingKeyCache<Pair<Descriptor, DecoratedKey>, 
Long>(table.name, columnFamilyName, 0);
+        rowCache = new AutoSavingRowCache<DecoratedKey, 
ColumnFamily>(table.name, columnFamilyName, 0);
+
         // scan for sstables corresponding to this cf and load them
-        ssTables = new SSTableTracker(table.name, columnFamilyName);
-        Set<DecoratedKey> savedKeys = 
readSavedCache(DatabaseDescriptor.getSerializedKeyCachePath(table.name, 
columnFamilyName));
+        ssTables = new SSTableTracker(this);
+        Set<DecoratedKey> savedKeys = keyCache.readSaved();
         List<SSTableReader> sstables = new ArrayList<SSTableReader>();
         for (Map.Entry<Descriptor,Set<Component>> sstableFiles : 
files(table.name, columnFamilyName, false).entrySet())
         {
@@ -266,53 +291,6 @@ public class ColumnFamilyStore implement
         }
     }
 
-    protected Set<DecoratedKey> readSavedCache(File path)
-    {
-        Set<DecoratedKey> keys = new TreeSet<DecoratedKey>();
-        if (path.exists())
-        {
-            DataInputStream in = null;
-            try
-            {
-                long start = System.currentTimeMillis();
-
-                logger.info(String.format("reading saved cache %s", path));
-                in = new DataInputStream(new BufferedInputStream(new 
FileInputStream(path)));
-                while (in.available() > 0)
-                {
-                    int size = in.readInt();
-                    byte[] bytes = new byte[size];
-                    in.readFully(bytes);
-                    ByteBuffer buffer = ByteBuffer.wrap(bytes);
-                    DecoratedKey key;
-                    try
-                    {
-                        key = 
StorageService.getPartitioner().decorateKey(buffer);
-                    }
-                    catch (Exception e)
-                    {
-                        logger.info(String.format("unable to read entry #%s 
from saved cache %s; skipping remaining entries",
-                                                  keys.size(), 
path.getAbsolutePath()), e);
-                        break;
-                    }
-                    keys.add(key);
-                }
-                if (logger.isDebugEnabled())
-                    logger.debug(String.format("completed reading (%d ms; %d 
keys) saved cache %s",
-                                               System.currentTimeMillis() - 
start, keys.size(), path));
-            }
-            catch (IOException ioe)
-            {
-                logger.warn(String.format("error reading saved cache %s", 
path.getAbsolutePath()), ioe);
-            }
-            finally
-            {
-                FileUtils.closeQuietly(in);
-            }
-        }
-        return keys;
-    }
-
     public boolean reverseReadWriteOrder()
     {
         //XXX: PURPOSE: allow less harmful race condition w/o locking
@@ -575,76 +553,26 @@ public class ColumnFamilyStore implement
     }
 
     // must be called after all sstables are loaded since row cache merges all 
row versions
-    public void initRowCache()
+    public void initCaches()
     {
-        int rowCacheSavePeriodInSeconds = 
DatabaseDescriptor.getTableMetaData(table.name).get(columnFamily).getRowCacheSavePeriodInSeconds();
-        int keyCacheSavePeriodInSeconds = 
DatabaseDescriptor.getTableMetaData(table.name).get(columnFamily).getKeyCacheSavePeriodInSeconds();
-
         long start = System.currentTimeMillis();
-        // sort the results on read because there are few reads and many 
writes and reads only happen at startup
-        Set<DecoratedKey> savedKeys = 
readSavedCache(DatabaseDescriptor.getSerializedRowCachePath(table.name, 
columnFamily));
-        for (DecoratedKey key : savedKeys)
+        // results are sorted on read (via treeset) because there are few 
reads and many writes and reads only happen at startup
+        for (DecoratedKey key : rowCache.readSaved())
             cacheRow(key);
-        if (ssTables.getRowCache().getSize() > 0)
+        if (rowCache.getSize() > 0)
             logger.info(String.format("completed loading (%d ms; %d keys) row 
cache for %s.%s",
                                       System.currentTimeMillis()-start,
-                                      ssTables.getRowCache().getSize(),
+                                      rowCache.getSize(),
                                       table.name,
                                       columnFamily));
-        scheduleCacheSaving(rowCacheSavePeriodInSeconds, 
keyCacheSavePeriodInSeconds);
-    }
-
-    public void scheduleCacheSaving(int rowCacheSavePeriodInSeconds, int 
keyCacheSavePeriodInSeconds)
-    {
-        if (saveRowCacheTask != null)
-        {
-            saveRowCacheTask.cancel(false); // Do not interrupt an in-progress 
save
-            saveRowCacheTask = null;
-        }
-        if (rowCacheSavePeriodInSeconds > 0)
-        {
-            Runnable runnable = new WrappedRunnable()
-            {
-                public void runMayThrow()
-                {
-                    submitRowCacheWrite();
-                }
-            };
-            saveRowCacheTask = 
StorageService.scheduledTasks.scheduleWithFixedDelay(runnable,
-                                                                               
     rowCacheSavePeriodInSeconds,
-                                                                               
     rowCacheSavePeriodInSeconds,
-                                                                               
     TimeUnit.SECONDS);
-        }
-
-        if (saveKeyCacheTask != null)
-        {
-            saveKeyCacheTask.cancel(false); // Do not interrupt an in-progress 
save
-            saveKeyCacheTask = null;
-        }
-        if (keyCacheSavePeriodInSeconds > 0)
-        {
-            Runnable runnable = new WrappedRunnable()
-            {
-                public void runMayThrow()
-                {
-                    submitKeyCacheWrite();
-                }
-            };
-            saveKeyCacheTask = 
StorageService.scheduledTasks.scheduleWithFixedDelay(runnable,
-                                                                               
     keyCacheSavePeriodInSeconds,
-                                                                               
     keyCacheSavePeriodInSeconds,
-                                                                               
     TimeUnit.SECONDS);
-        }
-    }
 
-    public Future<?> submitRowCacheWrite()
-    {
-        return 
CompactionManager.instance.submitCacheWrite(ssTables.getRowCacheWriter());
+        scheduleCacheSaving(metadata.getRowCacheSavePeriodInSeconds(), 
metadata.getKeyCacheSavePeriodInSeconds());
     }
 
-    public Future<?> submitKeyCacheWrite()
+    public void scheduleCacheSaving(int rowCacheSavePeriodInSeconds, int 
keyCacheSavePeriodInSeconds)
     {
-        return 
CompactionManager.instance.submitCacheWrite(ssTables.getKeyCacheWriter());
+        keyCache.scheduleSaving(keyCacheSavePeriodInSeconds);
+        rowCache.scheduleSaving(rowCacheSavePeriodInSeconds);
     }
 
     /**
@@ -1213,7 +1141,7 @@ public class ColumnFamilyStore implement
     private ColumnFamily cacheRow(DecoratedKey key)
     {
         ColumnFamily cached;
-        if ((cached = ssTables.getRowCache().get(key)) == null)
+        if ((cached = rowCache.get(key)) == null)
         {
             cached = getTopLevelColumns(QueryFilter.getIdentityFilter(key, new 
QueryPath(columnFamily)), Integer.MIN_VALUE);
             if (cached == null)
@@ -1228,7 +1156,7 @@ public class ColumnFamilyStore implement
             }
 
             // avoid keeping a permanent reference to the original key buffer
-            ssTables.getRowCache().put(new DecoratedKey(key.token, 
ByteBufferUtil.clone(key.key)), cached);
+            rowCache.put(new DecoratedKey(key.token, 
ByteBufferUtil.clone(key.key)), cached);
         }
         return cached;
     }
@@ -1240,7 +1168,7 @@ public class ColumnFamilyStore implement
         long start = System.nanoTime();
         try
         {
-            if (ssTables.getRowCache().getCapacity() == 0)
+            if (rowCache.getCapacity() == 0)
             {
                 ColumnFamily cf = getTopLevelColumns(filter, gcBefore);
                          
@@ -1799,12 +1727,12 @@ public class ColumnFamilyStore implement
     /** raw cached row -- does not fetch the row if it is not present.  not 
counted in cache statistics.  */
     public ColumnFamily getRawCachedRow(DecoratedKey key)
     {
-        return ssTables.getRowCache().getCapacity() == 0 ? null : 
ssTables.getRowCache().getInternal(key);
+        return rowCache.getCapacity() == 0 ? null : rowCache.getInternal(key);
     }
 
     public void invalidateCachedRow(DecoratedKey key)
     {
-        ssTables.getRowCache().remove(key);
+        rowCache.remove(key);
     }
 
     public void forceMajorCompaction() throws InterruptedException, 
ExecutionException
@@ -1814,32 +1742,32 @@ public class ColumnFamilyStore implement
 
     public void invalidateRowCache()
     {
-        ssTables.getRowCache().clear();
+        rowCache.clear();
     }
 
     public void invalidateKeyCache()
     {
-        ssTables.getKeyCache().clear();
+        keyCache.clear();
     }
 
     public int getRowCacheCapacity()
     {
-        return ssTables.getRowCache().getCapacity();
+        return rowCache.getCapacity();
     }
 
     public int getKeyCacheCapacity()
     {
-        return ssTables.getKeyCache().getCapacity();
+        return keyCache.getCapacity();
     }
 
     public int getRowCacheSize()
     {
-        return ssTables.getRowCache().getSize();
+        return rowCache.getSize();
     }
 
     public int getKeyCacheSize()
     {
-        return ssTables.getKeyCache().getSize();
+        return keyCache.getSize();
     }
 
     public static Iterable<ColumnFamilyStore> all()
@@ -2216,21 +2144,8 @@ public class ColumnFamilyStore implement
      */
     public void reduceCacheSizes()
     {
-        if (ssTables.getRowCache().getCapacity() > 0)
-        {
-            int newCapacity = (int) 
(DatabaseDescriptor.getReduceCacheCapacityTo() * 
ssTables.getRowCache().getSize());
-            logger.warn(String.format("Reducing %s row cache capacity from %d 
to %s to reduce memory pressure",
-                                      columnFamily, 
ssTables.getRowCache().getCapacity(), newCapacity));
-            ssTables.getRowCache().setCapacity(newCapacity);
-        }
-
-        if (ssTables.getKeyCache().getCapacity() > 0)
-        {
-            int newCapacity = (int) 
(DatabaseDescriptor.getReduceCacheCapacityTo() * 
ssTables.getKeyCache().getSize());
-            logger.warn(String.format("Reducing %s key cache capacity from %d 
to %s to reduce memory pressure",
-                                      columnFamily, 
ssTables.getKeyCache().getCapacity(), newCapacity));
-            ssTables.getKeyCache().setCapacity(newCapacity);
-        }
+        rowCache.reduceCacheSize();
+        keyCache.reduceCacheSize();
     }
 
     private ByteBuffer intern(ByteBuffer name)
@@ -2276,4 +2191,5 @@ public class ColumnFamilyStore implement
     {
         return Iterables.concat(indexedColumns.values(), 
Collections.singleton(this));
     }
+
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1082236&r1=1082235&r2=1082236&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed 
Mar 16 17:52:53 2011
@@ -39,6 +39,7 @@ import org.apache.commons.lang.StringUti
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
@@ -50,9 +51,9 @@ import org.apache.cassandra.service.Anti
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.OperationType;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.NodeId;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.cassandra.utils.NodeId;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public class CompactionManager implements CompactionManagerMBean
@@ -958,7 +959,7 @@ public class CompactionManager implement
         return executor.submit(callable);
     }
 
-    public Future<?> submitCacheWrite(final CacheWriter writer)
+    public Future<?> submitCacheWrite(final AutoSavingCache.Writer writer)
     {
         Runnable runnable = new WrappedRunnable()
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1082236&r1=1082235&r2=1082236&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Mar 16 
17:52:53 2011
@@ -114,7 +114,7 @@ public class Table
 
                     //table has to be constructed and in the cache before 
cacheRow can be called
                     for (ColumnFamilyStore cfs : 
tableInstance.getColumnFamilyStores())
-                        cfs.initRowCache();
+                        cfs.initCaches();
                 }
             }
         }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1082236&r1=1082235&r2=1082236&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
Wed Mar 16 17:52:53 2011
@@ -360,14 +360,6 @@ public class SSTableReader extends SSTab
     }
 
     /**
-     * @return The key cache: for monitoring purposes.
-     */
-    public InstrumentedCache getKeyCache()
-    {
-        return keyCache;
-    }
-
-    /**
      * @return An estimate of the number of keys in this SSTable.
      */
     public long estimatedKeys()
@@ -678,4 +670,9 @@ public class SSTableReader extends SSTab
     {
         return bloomFilterTracker.getRecentTruePositiveCount();
     }
+
+    public InstrumentedCache<Pair<Descriptor,DecoratedKey>, Long> getKeyCache()
+    {
+        return keyCache;
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java?rev=1082236&r1=1082235&r2=1082236&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java 
Wed Mar 16 17:52:53 2011
@@ -19,18 +19,15 @@
 
 package org.apache.cassandra.io.sstable;
 
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.base.Function;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.JMXInstrumentedCache;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.utils.Pair;
 
@@ -42,43 +39,12 @@ public class SSTableTracker implements I
     private final AtomicLong liveSize = new AtomicLong();
     private final AtomicLong totalSize = new AtomicLong();
 
-    private final String ksname;
-    private final String cfname;
-
-    private final JMXInstrumentedCache<Pair<Descriptor,DecoratedKey>,Long> 
keyCache;
-    private final JMXInstrumentedCache<DecoratedKey, ColumnFamily> rowCache;
+    private final ColumnFamilyStore cfs;
 
-    public SSTableTracker(String ksname, String cfname)
+    public SSTableTracker(ColumnFamilyStore cfs)
     {
-        this.ksname = ksname;
-        this.cfname = cfname;
+        this.cfs = cfs;
         sstables = Collections.emptySet();
-        keyCache = new 
JMXInstrumentedCache<Pair<Descriptor,DecoratedKey>,Long>(ksname, cfname + 
"KeyCache", 0);
-        rowCache = new JMXInstrumentedCache<DecoratedKey, 
ColumnFamily>(ksname, cfname + "RowCache", 3);
-    }
-
-    public CacheWriter<Pair<Descriptor, DecoratedKey>, Long> 
getKeyCacheWriter()
-    {
-        Function<Pair<Descriptor, DecoratedKey>, ByteBuffer> function = new 
Function<Pair<Descriptor, DecoratedKey>, ByteBuffer>()
-        {
-            public ByteBuffer apply(Pair<Descriptor, DecoratedKey> key)
-            {
-                return key.right.key;
-            }
-        };
-        return new CacheWriter<Pair<Descriptor, DecoratedKey>, Long>(cfname, 
keyCache, DatabaseDescriptor.getSerializedKeyCachePath(ksname, cfname), 
function);
-    }
-
-    public CacheWriter<DecoratedKey, ColumnFamily> getRowCacheWriter()
-    {
-        Function<DecoratedKey, ByteBuffer> function = new 
Function<DecoratedKey, ByteBuffer>()
-        {
-            public ByteBuffer apply(DecoratedKey key)
-            {
-                return key.key;
-            }
-        };
-        return new CacheWriter<DecoratedKey, ColumnFamily>(cfname, rowCache, 
DatabaseDescriptor.getSerializedRowCachePath(ksname, cfname), function);
     }
 
     public synchronized void replace(Collection<SSTableReader> oldSSTables, 
Iterable<SSTableReader> replacements)
@@ -90,7 +56,7 @@ public class SSTableTracker implements I
             assert sstable.getKeySamples() != null;
             if (logger.isDebugEnabled())
                 logger.debug(String.format("adding %s to list of files tracked 
for %s.%s",
-                                           sstable.descriptor, ksname, 
cfname));
+                                           sstable.descriptor, cfs.table.name, 
cfs.getColumnFamilyName()));
             sstablesNew.add(sstable);
             long size = sstable.bytesOnDisk();
             liveSize.addAndGet(size);
@@ -103,7 +69,7 @@ public class SSTableTracker implements I
         {
             if (logger.isDebugEnabled())
                 logger.debug(String.format("removing %s from list of files 
tracked for %s.%s",
-                                           sstable.descriptor, ksname, 
cfname));
+                                           sstable.descriptor, cfs.table.name, 
cfs.getColumnFamilyName()));
             boolean removed = sstablesNew.remove(sstable);
             assert removed;
             sstable.markCompacted();
@@ -132,29 +98,8 @@ public class SSTableTracker implements I
     public synchronized void updateCacheSizes()
     {
         long keys = estimatedKeys();
-
-        if (!keyCache.isCapacitySetManually())
-        {
-            int keyCacheSize = DatabaseDescriptor.getKeysCachedFor(ksname, 
cfname, keys);
-            if (keyCacheSize != keyCache.getCapacity())
-            {
-                // update cache size for the new key volume
-                if (logger.isDebugEnabled())
-                    logger.debug("key cache capacity for " + cfname + " is " + 
keyCacheSize);
-                keyCache.updateCapacity(keyCacheSize);
-            }
-        }
-
-        if (!rowCache.isCapacitySetManually())
-        {
-            int rowCacheSize = DatabaseDescriptor.getRowsCachedFor(ksname, 
cfname, keys);
-            if (rowCacheSize != rowCache.getCapacity())
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("row cache capacity for " + cfname + " is " + 
rowCacheSize);
-                rowCache.updateCapacity(rowCacheSize);
-            }
-        }
+        cfs.keyCache.updateCacheSize(keys);
+        cfs.rowCache.updateCacheSize(keys);
     }
 
     // the modifiers create new, unmodifiable objects each time; the volatile 
fences the assignment
@@ -179,9 +124,14 @@ public class SSTableTracker implements I
         sstables = Collections.emptySet();
     }
 
+    public JMXInstrumentedCache<Pair<Descriptor, DecoratedKey>, Long> 
getKeyCache()
+    {
+        return cfs.keyCache;
+    }
+
     public JMXInstrumentedCache<DecoratedKey, ColumnFamily> getRowCache()
     {
-        return rowCache;
+        return cfs.rowCache;
     }
 
     public long estimatedKeys()
@@ -208,10 +158,5 @@ public class SSTableTracker implements I
     {
         totalSize.addAndGet(-size);
     }
-
-    public JMXInstrumentedCache<Pair<Descriptor, DecoratedKey>, Long> 
getKeyCache()
-    {
-        return keyCache;
-    }
 }
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1082236&r1=1082235&r2=1082236&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Wed Mar 16 17:52:53 2011
@@ -34,18 +34,16 @@ import com.google.common.collect.ArrayLi
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
-
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.locator.*;
-import org.apache.cassandra.utils.*;
-import org.apache.log4j.Level;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.migration.AddKeyspace;
 import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.dht.BootStrapper;
@@ -55,6 +53,10 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.DeletionService;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -63,6 +65,8 @@ import org.apache.cassandra.service.Anti
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.thrift.Constants;
 import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.*;
+import org.apache.log4j.Level;
 import org.yaml.snakeyaml.Dumper;
 import org.yaml.snakeyaml.DumperOptions;
 import org.yaml.snakeyaml.Yaml;
@@ -2273,8 +2277,8 @@ public class StorageService implements I
         logger_.debug("submitting cache saves");
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
-            futures.add(cfs.submitKeyCacheWrite());
-            futures.add(cfs.submitRowCacheWrite());
+            futures.add(cfs.keyCache.submitWrite());
+            futures.add(cfs.rowCache.submitWrite());
         }
         FBUtilities.waitOnFutures(futures);
         logger_.debug("cache saves completed");


Reply via email to