Updated Branches:
  refs/heads/trunk dc920ab63 -> cfe585c2c

AutoSaving KeyCache and System load time improvements
patch by Vijay; reviewed by xedin and jbellis for CASSANDRA-3762


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cfe585c2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cfe585c2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cfe585c2

Branch: refs/heads/trunk
Commit: cfe585c2c420c6e8445eb4c3309b09db8cf134ac
Parents: dc920ab
Author: Vijay Parthasarathy <[email protected]>
Authored: Wed May 30 10:26:06 2012 -0700
Committer: Vijay Parthasarathy <[email protected]>
Committed: Wed May 30 10:26:06 2012 -0700

----------------------------------------------------------------------
 .../apache/cassandra/cache/AutoSavingCache.java    |  128 ++++++++-------
 src/java/org/apache/cassandra/cache/CacheKey.java  |   13 --
 .../org/apache/cassandra/cache/KeyCacheKey.java    |   17 +--
 .../org/apache/cassandra/cache/RowCacheKey.java    |   13 --
 .../cassandra/config/DatabaseDescriptor.java       |    4 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   27 +---
 .../org/apache/cassandra/db/RowIndexEntry.java     |    7 +-
 .../cassandra/db/compaction/CompactionInfo.java    |   45 ++++--
 .../cassandra/db/compaction/CompactionManager.java |    4 +-
 .../apache/cassandra/io/sstable/SSTableReader.java |   19 +--
 .../org/apache/cassandra/service/CacheService.java |  108 ++++++++++++-
 src/java/org/apache/cassandra/tools/NodeCmd.java   |   10 +-
 .../unit/org/apache/cassandra/db/RowCacheTest.java |    2 +-
 13 files changed, 237 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java 
b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 8a9e007..570ffbb 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -33,11 +33,12 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.utils.Pair;
 
@@ -51,15 +52,19 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
     protected volatile ScheduledFuture<?> saveTask;
     protected final CacheService.CacheType cacheType;
 
-    public AutoSavingCache(ICache<K, V> cache, CacheService.CacheType 
cacheType)
+    private CacheSerializer<K, V> cacheLoader;
+    private static final String CURRENT_VERSION = "b";
+
+    public AutoSavingCache(ICache<K, V> cache, CacheService.CacheType 
cacheType, CacheSerializer<K, V> cacheloader)
     {
         super(cache);
         this.cacheType = cacheType;
+        this.cacheLoader = cacheloader;
     }
 
-    public File getCachePath(String ksName, String cfName)
+    public File getCachePath(String ksName, String cfName, String version)
     {
-        return DatabaseDescriptor.getSerializedCachePath(ksName, cfName, 
cacheType);
+        return DatabaseDescriptor.getSerializedCachePath(ksName, cfName, 
cacheType, version);
     }
 
     public Writer getWriter(int keysToSave)
@@ -90,41 +95,51 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
         }
     }
 
-    public Set<DecoratedKey> readSaved(String ksName, String cfName)
+    public int loadSaved(ColumnFamilyStore store)
     {
-        File path = getCachePath(ksName, cfName);
-        Set<DecoratedKey> keys = new TreeSet<DecoratedKey>();
+        int count = 0;
+        long start = System.currentTimeMillis();
+        File path = getCachePath(store.table.name, store.columnFamily, null);
         if (path.exists())
         {
             DataInputStream in = null;
             try
             {
-                long start = System.currentTimeMillis();
+                logger.info(String.format("reading saved cache %s", path));
+                in = new DataInputStream(new BufferedInputStream(new 
FileInputStream(path)));
+                Set<ByteBuffer> keys = new HashSet<ByteBuffer>();
+                while (in.available() > 0)
+                {
+                    keys.add(ByteBufferUtil.readWithLength(in));
+                    count++;
+                }
+                cacheLoader.load(keys, store);
+            }
+            catch (Exception e)
+            {
+                logger.warn(String.format("error reading saved cache %s, keys 
loaded so far: %d", path.getAbsolutePath(), count), e);
+                return count;
+            }
+            finally
+            {
+                FileUtils.closeQuietly(in);
+            }
+        }
 
+        path = getCachePath(store.table.name, store.columnFamily, 
CURRENT_VERSION);
+        if (path.exists())
+        {
+            DataInputStream in = null;
+            try
+            {
                 logger.info(String.format("reading saved cache %s", path));
                 in = new DataInputStream(new BufferedInputStream(new 
FileInputStream(path)));
                 while (in.available() > 0)
                 {
-                    int size = in.readInt();
-                    byte[] bytes = new byte[size];
-                    in.readFully(bytes);
-                    ByteBuffer buffer = ByteBuffer.wrap(bytes);
-                    DecoratedKey key;
-                    try
-                    {
-                        key = 
StorageService.getPartitioner().decorateKey(buffer);
-                    }
-                    catch (Exception e)
-                    {
-                        logger.info(String.format("unable to read entry #%s 
from saved cache %s; skipping remaining entries",
-                                keys.size(), path.getAbsolutePath()), e);
-                        break;
-                    }
-                    keys.add(key);
+                    Pair<K, V> entry = cacheLoader.deserialize(in, store);
+                    put(entry.left, entry.right);
+                    count++;
                 }
-                if (logger.isDebugEnabled())
-                    logger.debug(String.format("completed reading (%d ms; %d 
keys) saved cache %s",
-                            System.currentTimeMillis() - start, keys.size(), 
path));
             }
             catch (Exception e)
             {
@@ -135,7 +150,10 @@ public class AutoSavingCache<K extends CacheKey, V> 
extends InstrumentingCache<K
                 FileUtils.closeQuietly(in);
             }
         }
-        return keys;
+        if (logger.isDebugEnabled())
+            logger.debug(String.format("completed reading (%d ms; %d keys) 
saved cache %s",
+                    System.currentTimeMillis() - start, count, path));
+        return count;
     }
 
     public Future<?> submitWrite(int keysToSave)
@@ -156,22 +174,11 @@ public class AutoSavingCache<K extends CacheKey, V> 
extends InstrumentingCache<K
         }
     }
 
-    public int estimateSizeToSave(Set<K> keys)
-    {
-        int bytes = 0;
-
-        for (K key : keys)
-            bytes += key.serializedSize();
-
-        return bytes;
-    }
-
     public class Writer extends CompactionInfo.Holder
     {
         private final Set<K> keys;
         private final CompactionInfo info;
-        private final long estimatedTotalBytes;
-        private long bytesWritten;
+        private long keysWritten;
 
         protected Writer(int keysToSave)
         {
@@ -180,9 +187,6 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
             else
                 keys = hotKeySet(keysToSave);
 
-            // an approximation -- the keyset can change while saving
-            estimatedTotalBytes = estimateSizeToSave(keys);
-
             OperationType type;
             if (cacheType == CacheService.CacheType.KEY_CACHE)
                 type = OperationType.KEY_CACHE_SAVE;
@@ -191,15 +195,13 @@ public class AutoSavingCache<K extends CacheKey, V> 
extends InstrumentingCache<K
             else
                 type = OperationType.UNKNOWN;
 
-            info = new CompactionInfo(type, 0, estimatedTotalBytes);
+            info = new CompactionInfo(type, 0, keys.size(), "keys");
         }
 
         public CompactionInfo getCompactionInfo()
         {
-            long bytesWritten = this.bytesWritten;
-            // keyset can change in size, thus totalBytes can too
-            return info.forProgress(bytesWritten,
-                                    Math.max(bytesWritten, 
estimatedTotalBytes));
+            // keyset can change in size, thus total can too
+            return info.forProgress(keysWritten, Math.max(keysWritten, 
keys.size()));
         }
 
         public void saveCache() throws IOException
@@ -207,7 +209,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
             logger.debug("Deleting old {} files.", cacheType);
             deleteOldCacheFiles();
 
-            if (keys.size() == 0 || estimatedTotalBytes == 0)
+            if (keys.size() == 0 || keys.size() == 0)
             {
                 logger.debug("Skipping {} save, cache is empty.", cacheType);
                 return;
@@ -219,7 +221,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
 
             try
             {
-                for (CacheKey key : keys)
+                for (K key : keys)
                 {
                     Pair<String, String> path = key.getPathInfo();
                     SequentialWriter writer = writers.get(path);
@@ -229,9 +231,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
                         writer = tempCacheFile(path);
                         writers.put(path, writer);
                     }
-
-                    key.write(writer.stream);
-                    bytesWritten += key.serializedSize();
+                    cacheLoader.serialize(key, writer.stream);
+                    keysWritten++;
                 }
             }
             finally
@@ -246,7 +247,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
                 SequentialWriter writer = info.getValue();
 
                 File tmpFile = new File(writer.getPath());
-                File cacheFile = getCachePath(path.left, path.right);
+                File cacheFile = getCachePath(path.left, path.right, 
CURRENT_VERSION);
 
                 cacheFile.delete(); // ignore error if it didn't exist
                 if (!tmpFile.renameTo(cacheFile))
@@ -258,13 +259,12 @@ public class AutoSavingCache<K extends CacheKey, V> 
extends InstrumentingCache<K
 
         private SequentialWriter tempCacheFile(Pair<String, String> pathInfo) 
throws IOException
         {
-            File path = getCachePath(pathInfo.left, pathInfo.right);
+            File path = getCachePath(pathInfo.left, pathInfo.right, 
CURRENT_VERSION);
             File tmpFile = File.createTempFile(path.getName(), null, 
path.getParentFile());
 
             return SequentialWriter.open(tmpFile, true);
         }
 
-
         private void deleteOldCacheFiles()
         {
             File savedCachesDir = new 
File(DatabaseDescriptor.getSavedCachesLocation());
@@ -278,8 +278,24 @@ public class AutoSavingCache<K extends CacheKey, V> 
extends InstrumentingCache<K
                         if (!file.delete())
                             logger.warn("Failed to delete {}", 
file.getAbsolutePath());
                     }
+
+                    if (file.isFile() && 
file.getName().endsWith(CURRENT_VERSION + ".db"))
+                    {
+                        if (!file.delete())
+                            logger.warn("Failed to delete {}", 
file.getAbsolutePath());
+                    }
                 }
             }
         }
     }
+
+    public interface CacheSerializer<K extends CacheKey, V>
+    {
+        void serialize(K key, DataOutput out) throws IOException;
+
+        Pair<K, V> deserialize(DataInputStream in, ColumnFamilyStore store) 
throws IOException;
+
+        @Deprecated
+        void load(Set<ByteBuffer> buffer, ColumnFamilyStore store);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/cache/CacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/CacheKey.java 
b/src/java/org/apache/cassandra/cache/CacheKey.java
index 08b2fcf..5743dfc 100644
--- a/src/java/org/apache/cassandra/cache/CacheKey.java
+++ b/src/java/org/apache/cassandra/cache/CacheKey.java
@@ -17,24 +17,11 @@
  */
 package org.apache.cassandra.cache;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
-
 import org.apache.cassandra.utils.Pair;
 
 public interface CacheKey
 {
     /**
-     * @return Serialized part of the key which should be persisted
-     */
-    public void write(DataOutputStream out) throws IOException;
-
-    /**
-     * @return The size of the serialized key
-     */
-    public int serializedSize();
-
-    /**
      * @return The keyspace and ColumnFamily names to which this key belongs
      */
     public Pair<String, String> getPathInfo();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/cache/KeyCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/KeyCacheKey.java 
b/src/java/org/apache/cassandra/cache/KeyCacheKey.java
index ce83726..bee88e8 100644
--- a/src/java/org/apache/cassandra/cache/KeyCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/KeyCacheKey.java
@@ -17,21 +17,18 @@
  */
 package org.apache.cassandra.cache;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.Arrays;
 
-import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
 public class KeyCacheKey implements CacheKey
 {
-    private final Descriptor desc;
-    private final byte[] key;
+    public final Descriptor desc;
+    public final byte[] key;
 
     public KeyCacheKey(Descriptor desc, ByteBuffer key)
     {
@@ -40,21 +37,11 @@ public class KeyCacheKey implements CacheKey
         assert this.key != null;
     }
 
-    public void write(DataOutputStream out) throws IOException
-    {
-        ByteBufferUtil.writeWithLength(key, out);
-    }
-
     public Pair<String, String> getPathInfo()
     {
         return new Pair<String, String>(desc.ksname, desc.cfname);
     }
 
-    public int serializedSize()
-    {
-        return TypeSizes.NATIVE.sizeof(key.length) + key.length;
-    }
-
     public String toString()
     {
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/cache/RowCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/RowCacheKey.java 
b/src/java/org/apache/cassandra/cache/RowCacheKey.java
index 009483c..10a80b6 100644
--- a/src/java/org/apache/cassandra/cache/RowCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/RowCacheKey.java
@@ -17,14 +17,11 @@
  */
 package org.apache.cassandra.cache;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.UUID;
 
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -47,21 +44,11 @@ public class RowCacheKey implements CacheKey, 
Comparable<RowCacheKey>
         assert this.key != null;
     }
 
-    public void write(DataOutputStream out) throws IOException
-    {
-        ByteBufferUtil.writeWithLength(key, out);
-    }
-
     public Pair<String, String> getPathInfo()
     {
         return Schema.instance.getCF(cfId);
     }
 
-    public int serializedSize()
-    {
-        return key.length + TypeSizes.NATIVE.sizeof(key.length);
-    }
-
     @Override
     public boolean equals(Object o)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index bb48ff7..06344cc 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -901,9 +901,9 @@ public class DatabaseDescriptor
         return conf.index_interval;
     }
 
-    public static File getSerializedCachePath(String ksName, String cfName, 
CacheService.CacheType cacheType)
+    public static File getSerializedCachePath(String ksName, String cfName, 
CacheService.CacheType cacheType, String version)
     {
-        return new File(conf.saved_caches_directory + File.separator + ksName 
+ "-" + cfName + "-" + cacheType);
+        return new File(conf.saved_caches_directory + File.separator + ksName 
+ "-" + cfName + "-" + cacheType + ((version != null) ? "-" + version + ".db" : 
""));
     }
 
     public static int getDynamicUpdateInterval()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 2891585..11f9ec9 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -34,7 +34,6 @@ import com.google.common.util.concurrent.Futures;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.cache.IRowCacheEntry;
 import org.apache.cassandra.cache.RowCacheKey;
 import org.apache.cassandra.cache.RowCacheSentinel;
@@ -223,12 +222,10 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
         // scan for sstables corresponding to this cf and load them
         data = new DataTracker(this);
-        Set<DecoratedKey> savedKeys = caching == Caching.NONE || caching == 
Caching.ROWS_ONLY
-                                       ? Collections.<DecoratedKey>emptySet()
-                                       : 
CacheService.instance.keyCache.readSaved(table.name, columnFamily);
-
         Directories.SSTableLister sstables = 
directories.sstableLister().skipCompacted(true).skipTemporary(true);
-        
data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), 
savedKeys, data, metadata, this.partitioner));
+        
data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), 
data, metadata, this.partitioner));
+        if (caching != Caching.NONE || caching != Caching.ROWS_ONLY)
+            CacheService.instance.keyCache.loadSaved(this);
 
         // compaction strategy should be created after the CFS has been 
prepared
         this.compactionStrategy = 
metadata.createCompactionStrategyInstance(this);
@@ -405,19 +402,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
         long start = System.currentTimeMillis();
 
-        AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = 
CacheService.instance.rowCache;
-
-        // results are sorted on read (via treeset) because there are few 
reads and many writes and reads only happen at startup
-        int cachedRowsRead = 0;
-        for (DecoratedKey key : rowCache.readSaved(table.name, columnFamily))
-        {
-            ColumnFamily data = 
getTopLevelColumns(QueryFilter.getIdentityFilter(key, new 
QueryPath(columnFamily)),
-                                                   Integer.MIN_VALUE,
-                                                   true);
-            CacheService.instance.rowCache.put(new RowCacheKey(metadata.cfId, 
key), data);
-            cachedRowsRead++;
-        }
-
+        int cachedRowsRead = CacheService.instance.rowCache.loadSaved(this);
         if (cachedRowsRead > 0)
             logger.info(String.format("completed loading (%d ms; %d keys) row 
cache for %s.%s",
                         System.currentTimeMillis() - start,
@@ -478,7 +463,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             SSTableReader reader;
             try
             {
-                reader = SSTableReader.open(newDescriptor, entry.getValue(), 
Collections.<DecoratedKey>emptySet(), data, metadata, partitioner);
+                reader = SSTableReader.open(newDescriptor, entry.getValue(), 
data, metadata, partitioner);
             }
             catch (IOException e)
             {
@@ -1294,7 +1279,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         return new ViewFragment(sstables, 
Iterables.concat(Collections.singleton(view.memtable), 
view.memtablesPendingFlush));
     }
 
-    private ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore, 
boolean forCache)
+    public ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore, 
boolean forCache)
     {
         CollationController controller = new CollationController(this, 
forCache, filter, gcBefore);
         ColumnFamily columns = controller.getTopLevelColumns();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java 
b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index 8ec1ab3..0c50746 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -133,9 +133,12 @@ public class RowIndexEntry
         public void skip(DataInput dis, Descriptor.Version version) throws 
IOException
         {
             dis.readLong();
-            if (!version.hasPromotedIndexes)
-                return;
+            if (version.hasPromotedIndexes)
+                skipPromotedIndex(dis);
+        }
 
+        public void skipPromotedIndex(DataInput dis) throws IOException
+        {
             int size = dis.readInt();
             if (size <= 0)
                 return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index cc46c31..d0f7efb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -32,8 +32,9 @@ public final class CompactionInfo implements Serializable
     private static final long serialVersionUID = 3695381572726744816L;
     private final CFMetaData cfm;
     private final OperationType tasktype;
-    private final long bytesComplete;
-    private final long totalBytes;
+    private final long completed;
+    private final long total;
+    private final String unit;
 
     public CompactionInfo(OperationType tasktype, long bytesComplete, long 
totalBytes)
     {
@@ -42,16 +43,27 @@ public final class CompactionInfo implements Serializable
 
     public CompactionInfo(UUID id, OperationType tasktype, long bytesComplete, 
long totalBytes)
     {
+        this(id, tasktype, bytesComplete, totalBytes, "bytes");
+    }
+
+    public CompactionInfo(OperationType tasktype, long completed, long total, 
String unit)
+    {
+        this(null, tasktype, completed, total, unit);
+    }
+
+    public CompactionInfo(UUID id, OperationType tasktype, long completed, 
long total, String unit)
+    {
         this.tasktype = tasktype;
-        this.bytesComplete = bytesComplete;
-        this.totalBytes = totalBytes;
+        this.completed = completed;
+        this.total = total;
         this.cfm = id == null ? null : Schema.instance.getCFMetaData(id);
+        this.unit = unit;
     }
 
     /** @return A copy of this CompactionInfo with updated progress. */
     public CompactionInfo forProgress(long bytesComplete, long totalBytes)
     {
-        return new CompactionInfo(cfm == null ? null : cfm.cfId, tasktype, 
bytesComplete, totalBytes);
+        return new CompactionInfo(cfm == null ? null : cfm.cfId, tasktype, 
bytesComplete, totalBytes, unit);
     }
 
     public UUID getId()
@@ -74,14 +86,14 @@ public final class CompactionInfo implements Serializable
         return cfm;
     }
 
-    public long getBytesComplete()
+    public long getCompleted()
     {
-        return bytesComplete;
+        return completed;
     }
 
-    public long getTotalBytes()
+    public long getTotal()
     {
-        return totalBytes;
+        return total;
     }
 
     public OperationType getTaskType()
@@ -94,19 +106,20 @@ public final class CompactionInfo implements Serializable
         StringBuilder buff = new StringBuilder();
         buff.append(getTaskType()).append('@').append(getId());
         buff.append('(').append(getKeyspace()).append(", 
").append(getColumnFamily());
-        buff.append(", 
").append(getBytesComplete()).append('/').append(getTotalBytes());
-        return buff.append(')').toString();
+        buff.append(", 
").append(getCompleted()).append('/').append(getTotal());
+        return buff.append(')').append(unit).toString();
     }
 
     public Map<String, String> asMap()
     {
         Map<String, String> ret = new HashMap<String, String>();
-        ret.put("id", getId().toString());
+        ret.put("id", getId() == null ? "" : getId().toString());
         ret.put("keyspace", getKeyspace());
         ret.put("columnfamily", getColumnFamily());
-        ret.put("bytesComplete", Long.toString(bytesComplete));
-        ret.put("totalBytes", Long.toString(totalBytes));
+        ret.put("completed", Long.toString(completed));
+        ret.put("total", Long.toString(total));
         ret.put("taskType", tasktype.toString());
+        ret.put("unit", unit);
         return ret;
     }
 
@@ -131,7 +144,7 @@ public final class CompactionInfo implements Serializable
          */
         public void started()
         {
-            reportedSeverity = 
StorageService.instance.reportSeverity(getCompactionInfo().getTotalBytes()/load);
+            reportedSeverity = 
StorageService.instance.reportSeverity(getCompactionInfo().getTotal()/load);
         }
 
         /**
@@ -140,7 +153,7 @@ public final class CompactionInfo implements Serializable
         public void finished()
         {
             if (reportedSeverity)
-                
StorageService.instance.reportSeverity(-(getCompactionInfo().getTotalBytes()/load));
+                
StorageService.instance.reportSeverity(-(getCompactionInfo().getTotal()/load));
             reportedSeverity = false;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 68c23fd..5f530a2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1052,7 +1052,7 @@ public class CompactionManager implements 
CompactionManagerMBean
             // notify
             ci.finished();
             compactions.remove(ci);
-            totalBytesCompacted += ci.getCompactionInfo().getTotalBytes();
+            totalBytesCompacted += ci.getCompactionInfo().getTotal();
             totalCompactionsCompleted += 1;
         }
 
@@ -1065,7 +1065,7 @@ public class CompactionManager implements 
CompactionManagerMBean
         {
             long bytesCompletedInProgress = 0L;
             for (CompactionInfo.Holder ci : compactions)
-                bytesCompletedInProgress += 
ci.getCompactionInfo().getBytesComplete();
+                bytesCompletedInProgress += 
ci.getCompactionInfo().getCompleted();
             return bytesCompletedInProgress + totalBytesCompacted;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 895cd59..9e576dc 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -138,10 +138,10 @@ public class SSTableReader extends SSTable
 
     public static SSTableReader open(Descriptor descriptor, Set<Component> 
components, CFMetaData metadata, IPartitioner partitioner) throws IOException
     {
-        return open(descriptor, components, 
Collections.<DecoratedKey>emptySet(), null, metadata, partitioner);
+        return open(descriptor, components, null, metadata, partitioner);
     }
 
-    public static SSTableReader open(Descriptor descriptor, Set<Component> 
components, Set<DecoratedKey> savedKeys, DataTracker tracker, CFMetaData 
metadata, IPartitioner partitioner) throws IOException
+    public static SSTableReader open(Descriptor descriptor, Set<Component> 
components, DataTracker tracker, CFMetaData metadata, IPartitioner partitioner) 
throws IOException
     {
         assert partitioner != null;
         // Minimum components without which we can't do anything
@@ -178,11 +178,11 @@ public class SSTableReader extends SSTable
         // versions before 'c' encoded keys as utf-16 before hashing to the 
filter
         if (descriptor.version.hasStringsInBloomFilter)
         {
-            sstable.load(true, savedKeys);
+            sstable.load(true);
         }
         else
         {
-            sstable.load(false, savedKeys);
+            sstable.load(false);
             sstable.loadBloomFilter();
         }
         if (logger.isDebugEnabled())
@@ -203,7 +203,6 @@ public class SSTableReader extends SSTable
     }
 
     public static Collection<SSTableReader> 
batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries,
-                                                      final Set<DecoratedKey> 
savedKeys,
                                                       final DataTracker 
tracker,
                                                       final CFMetaData 
metadata,
                                                       final IPartitioner 
partitioner)
@@ -220,7 +219,7 @@ public class SSTableReader extends SSTable
                     SSTableReader sstable;
                     try
                     {
-                        sstable = open(entry.getKey(), entry.getValue(), 
savedKeys, tracker, metadata, partitioner);
+                        sstable = open(entry.getKey(), entry.getValue(), 
tracker, metadata, partitioner);
                     }
                     catch (IOException ex)
                     {
@@ -328,10 +327,8 @@ public class SSTableReader extends SSTable
     /**
      * Loads ifile, dfile and indexSummary, and optionally recreates the bloom 
filter.
      */
-    private void load(boolean recreatebloom, Set<DecoratedKey> 
keysToLoadInCache) throws IOException
+    private void load(boolean recreatebloom) throws IOException
     {
-        boolean cacheLoading = keyCache != null && 
!keysToLoadInCache.isEmpty();
-
         SegmentedFile.Builder ibuilder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
         SegmentedFile.Builder dbuilder = compression
                                           ? 
SegmentedFile.getCompressedBuilder()
@@ -343,7 +340,7 @@ public class SSTableReader extends SSTable
         // try to load summaries from the disk and check if we need
         // to read primary index because we should re-create a BloomFilter or 
pre-load KeyCache
         final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
-        final boolean readIndex = recreatebloom || cacheLoading || 
!summaryLoaded;
+        final boolean readIndex = recreatebloom || !summaryLoaded;
         try
         {
             long indexSize = primaryIndex.length();
@@ -369,8 +366,6 @@ public class SSTableReader extends SSTable
 
                 if (recreatebloom)
                     bf.add(decoratedKey.key);
-                if (cacheLoading && keysToLoadInCache.contains(decoratedKey))
-                    cacheKey(decoratedKey, indexEntry);
 
                 // if summary was already read from disk we don't want to 
re-populate it using primary index
                 if (!summaryLoaded)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java 
b/src/java/org/apache/cassandra/service/CacheService.java
index 3a2d8e9..e66d995 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -17,9 +17,15 @@
  */
 package org.apache.cassandra.service;
 
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -27,10 +33,21 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.cassandra.cache.*;
+import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableReader.Operator;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,7 +118,7 @@ public class CacheService implements CacheServiceMBean
         // as values are constant size we can use singleton weigher
         // where 48 = 40 bytes (average size of the key) + 8 bytes (size of 
value)
         ICache<KeyCacheKey, RowIndexEntry> kc = 
ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity / 
AVERAGE_KEY_CACHE_ROW_SIZE);
-        AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = new 
AutoSavingCache<KeyCacheKey, RowIndexEntry>(kc, CacheType.KEY_CACHE);
+        AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = new 
AutoSavingCache<KeyCacheKey, RowIndexEntry>(kc, CacheType.KEY_CACHE, new 
KeyCacheSerializer());
 
         int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave();
 
@@ -127,7 +144,7 @@ public class CacheService implements CacheServiceMBean
 
         // cache object
         ICache<RowCacheKey, IRowCacheEntry> rc = 
DatabaseDescriptor.getRowCacheProvider().create(rowCacheInMemoryCapacity, true);
-        AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new 
AutoSavingCache<RowCacheKey, IRowCacheEntry>(rc, CacheType.ROW_CACHE);
+        AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new 
AutoSavingCache<RowCacheKey, IRowCacheEntry>(rc, CacheType.ROW_CACHE, new 
RowCacheSerializer());
 
         int rowCacheKeysToSave = DatabaseDescriptor.getRowCacheKeysToSave();
 
@@ -281,4 +298,91 @@ public class CacheService implements CacheServiceMBean
         FBUtilities.waitOnFutures(futures);
         logger.debug("cache saves completed");
     }
+
+    public class RowCacheSerializer implements CacheSerializer<RowCacheKey, 
IRowCacheEntry>
+    {
+        public void serialize(RowCacheKey key, DataOutput out) throws 
IOException
+        {
+            ByteBufferUtil.writeWithLength(key.key, out);
+        }
+
+        public Pair<RowCacheKey, IRowCacheEntry> deserialize(DataInputStream 
in, ColumnFamilyStore store) throws IOException
+        {
+            ByteBuffer buffer = ByteBufferUtil.readWithLength(in);
+            DecoratedKey key = 
StorageService.getPartitioner().decorateKey(buffer);
+            ColumnFamily data = 
store.getTopLevelColumns(QueryFilter.getIdentityFilter(key, new 
QueryPath(store.columnFamily)), Integer.MIN_VALUE, true);
+            return new Pair<RowCacheKey, IRowCacheEntry>(new 
RowCacheKey(store.metadata.cfId, key), data);
+        }
+
+        @Override
+        public void load(Set<ByteBuffer> buffers, ColumnFamilyStore store)
+        {
+            for (ByteBuffer key : buffers)
+            {
+                DecoratedKey dk = 
StorageService.getPartitioner().decorateKey(key);
+                ColumnFamily data = 
store.getTopLevelColumns(QueryFilter.getIdentityFilter(dk, new 
QueryPath(store.columnFamily)), Integer.MIN_VALUE, true);
+                rowCache.put(new RowCacheKey(store.metadata.cfId, dk), data);
+            }
+        }
+    }
+
+    public class KeyCacheSerializer implements CacheSerializer<KeyCacheKey, 
RowIndexEntry>
+    {
+        public void serialize(KeyCacheKey key, DataOutput out) throws 
IOException
+        {
+            RowIndexEntry entry = CacheService.instance.keyCache.get(key);
+            if (entry == null)
+                return;
+            ByteBufferUtil.writeWithLength(key.key, out);
+            Descriptor desc = key.desc;
+            out.writeInt(desc.generation);
+            out.writeBoolean(desc.version.hasPromotedIndexes);
+            if (!desc.version.hasPromotedIndexes)
+                return;
+            RowIndexEntry.serializer.serialize(entry, out);
+        }
+
+        public Pair<KeyCacheKey, RowIndexEntry> deserialize(DataInputStream 
input, ColumnFamilyStore store) throws IOException
+        {
+            ByteBuffer key = ByteBufferUtil.readWithLength(input);
+            int generation = input.readInt();
+            SSTableReader reader = findDesc(generation, store.getSSTables());
+            if (reader == null)
+            {
+                RowIndexEntry.serializer.skipPromotedIndex(input);
+                return null;
+            }
+            RowIndexEntry entry;
+            if (input.readBoolean())
+                entry = RowIndexEntry.serializer.deserialize(input, 
reader.descriptor.version);
+            else
+                entry = 
reader.getPosition(StorageService.getPartitioner().decorateKey(key), 
Operator.EQ);
+            return new Pair<KeyCacheKey, RowIndexEntry>(new 
KeyCacheKey(reader.descriptor, key), entry);
+        }
+
+        private SSTableReader findDesc(int generation, 
Collection<SSTableReader> collection)
+        {
+            for (SSTableReader sstable : collection)
+            {
+                if (sstable.descriptor.generation == generation)
+                    return sstable;
+            }
+            return null;
+        }
+
+        @Override
+        public void load(Set<ByteBuffer> buffers, ColumnFamilyStore store)
+        {
+            for (ByteBuffer key : buffers)
+            {
+                DecoratedKey dk = 
StorageService.getPartitioner().decorateKey(key);
+                for (SSTableReader sstable : store.getSSTables())
+                {
+                    RowIndexEntry entry = sstable.getPosition(dk, Operator.EQ);
+                    if (entry != null)
+                        keyCache.put(new KeyCacheKey(sstable.descriptor, key), 
entry);
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java 
b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 85e2c45..3d9ee29 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -472,16 +472,16 @@ public class NodeCmd
         CompactionManagerMBean cm = probe.getCompactionManagerProxy();
         outs.println("pending tasks: " + cm.getPendingTasks());
         if (cm.getCompactions().size() > 0)
-            outs.printf("%25s%16s%16s%16s%16s%10s%n", "compaction type", 
"keyspace", "column family", "bytes compacted", "bytes total", "progress");
+            outs.printf("%25s%16s%16s%16s%16s%10s%10s%n", "compaction type", 
"keyspace", "column family", "completed", "total", "unit", "progress");
         long remainingBytes = 0;
         for (Map<String, String> c : cm.getCompactions())
         {
-            String percentComplete = new Long(c.get("totalBytes")) == 0
+            String percentComplete = new Long(c.get("total")) == 0
                                    ? "n/a"
-                                   : new DecimalFormat("0.00").format((double) 
new Long(c.get("bytesComplete")) / new Long(c.get("totalBytes")) * 100) + "%";
-            outs.printf("%25s%16s%16s%16s%16s%10s%n", c.get("taskType"), 
c.get("keyspace"), c.get("columnfamily"), c.get("bytesComplete"), 
c.get("totalBytes"), percentComplete);
+                                   : new DecimalFormat("0.00").format((double) 
new Long(c.get("completed")) / new Long(c.get("total")) * 100) + "%";
+            outs.printf("%25s%16s%16s%16s%16s%10s%10s%n", c.get("taskType"), 
c.get("keyspace"), c.get("columnfamily"), c.get("completed"), c.get("total"), 
c.get("unit"), percentComplete);
             if (c.get("taskType").equals(OperationType.COMPACTION.toString()))
-                remainingBytes += (new Long(c.get("totalBytes")) - new 
Long(c.get("bytesComplete")));
+                remainingBytes += (new Long(c.get("total")) - new 
Long(c.get("completed")));
         }
         long remainingTimeInSecs = compactionThroughput == 0 || remainingBytes 
== 0
                         ? -1 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java 
b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index e6b4578..99b8cbc 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -153,6 +153,6 @@ public class RowCacheTest extends SchemaLoader
         // empty the cache again to make sure values came from disk
         CacheService.instance.invalidateRowCache();
         assert CacheService.instance.rowCache.size() == 0;
-        assert CacheService.instance.rowCache.readSaved(KEYSPACE, 
COLUMN_FAMILY).size() == (keysToSave == Integer.MAX_VALUE ? totalKeys : 
keysToSave);
+        assert CacheService.instance.rowCache.loadSaved(store) == (keysToSave 
== Integer.MAX_VALUE ? totalKeys : keysToSave);
     }
 }

Reply via email to