Updated Branches: refs/heads/trunk dc920ab63 -> cfe585c2c
AutoSaving KeyCache and System load time improvements patch by Vijay; reviewed by xedin and jbellis for CASSANDRA-3762 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cfe585c2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cfe585c2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cfe585c2 Branch: refs/heads/trunk Commit: cfe585c2c420c6e8445eb4c3309b09db8cf134ac Parents: dc920ab Author: Vijay Parthasarathy <[email protected]> Authored: Wed May 30 10:26:06 2012 -0700 Committer: Vijay Parthasarathy <[email protected]> Committed: Wed May 30 10:26:06 2012 -0700 ---------------------------------------------------------------------- .../apache/cassandra/cache/AutoSavingCache.java | 128 ++++++++------- src/java/org/apache/cassandra/cache/CacheKey.java | 13 -- .../org/apache/cassandra/cache/KeyCacheKey.java | 17 +-- .../org/apache/cassandra/cache/RowCacheKey.java | 13 -- .../cassandra/config/DatabaseDescriptor.java | 4 +- .../org/apache/cassandra/db/ColumnFamilyStore.java | 27 +--- .../org/apache/cassandra/db/RowIndexEntry.java | 7 +- .../cassandra/db/compaction/CompactionInfo.java | 45 ++++-- .../cassandra/db/compaction/CompactionManager.java | 4 +- .../apache/cassandra/io/sstable/SSTableReader.java | 19 +-- .../org/apache/cassandra/service/CacheService.java | 108 ++++++++++++- src/java/org/apache/cassandra/tools/NodeCmd.java | 10 +- .../unit/org/apache/cassandra/db/RowCacheTest.java | 2 +- 13 files changed, 237 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 8a9e007..570ffbb 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -33,11 +33,12 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.compaction.CompactionInfo; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.SequentialWriter; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.WrappedRunnable; import org.apache.cassandra.utils.Pair; @@ -51,15 +52,19 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K protected volatile ScheduledFuture<?> saveTask; protected final CacheService.CacheType cacheType; - public AutoSavingCache(ICache<K, V> cache, CacheService.CacheType cacheType) + private CacheSerializer<K, V> cacheLoader; + private static final String CURRENT_VERSION = "b"; + + public AutoSavingCache(ICache<K, V> cache, CacheService.CacheType cacheType, CacheSerializer<K, V> cacheloader) { super(cache); this.cacheType = cacheType; + this.cacheLoader = cacheloader; } - public File getCachePath(String ksName, String cfName) + public File getCachePath(String ksName, String cfName, String version) { - return DatabaseDescriptor.getSerializedCachePath(ksName, cfName, cacheType); + return DatabaseDescriptor.getSerializedCachePath(ksName, cfName, cacheType, version); } public Writer getWriter(int keysToSave) @@ -90,41 +95,51 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K } } - public Set<DecoratedKey> readSaved(String ksName, String cfName) + public int loadSaved(ColumnFamilyStore store) { - File path = getCachePath(ksName, cfName); - Set<DecoratedKey> keys = new TreeSet<DecoratedKey>(); + int count = 0; + long start = System.currentTimeMillis(); + File path = getCachePath(store.table.name, store.columnFamily, null); if (path.exists()) { DataInputStream in = null; try { - long start = System.currentTimeMillis(); + logger.info(String.format("reading saved cache %s", path)); + in = new DataInputStream(new BufferedInputStream(new FileInputStream(path))); + Set<ByteBuffer> keys = new HashSet<ByteBuffer>(); + while (in.available() > 0) + { + keys.add(ByteBufferUtil.readWithLength(in)); + count++; + } + cacheLoader.load(keys, store); + } + catch (Exception e) + { + logger.warn(String.format("error reading saved cache %s, keys loaded so far: %d", path.getAbsolutePath(), count), e); + return count; + } + finally + { + FileUtils.closeQuietly(in); + } + } + path = getCachePath(store.table.name, store.columnFamily, CURRENT_VERSION); + if (path.exists()) + { + DataInputStream in = null; + try + { logger.info(String.format("reading saved cache %s", path)); in = new DataInputStream(new BufferedInputStream(new FileInputStream(path))); while (in.available() > 0) { - int size = in.readInt(); - byte[] bytes = new byte[size]; - in.readFully(bytes); - ByteBuffer buffer = ByteBuffer.wrap(bytes); - DecoratedKey key; - try - { - key = StorageService.getPartitioner().decorateKey(buffer); - } - catch (Exception e) - { - logger.info(String.format("unable to read entry #%s from saved cache %s; skipping remaining entries", - keys.size(), path.getAbsolutePath()), e); - break; - } - keys.add(key); + Pair<K, V> entry = cacheLoader.deserialize(in, store); + put(entry.left, entry.right); + count++; } - if (logger.isDebugEnabled()) - logger.debug(String.format("completed reading (%d ms; %d keys) saved cache %s", - System.currentTimeMillis() - start, keys.size(), path)); } catch (Exception e) { @@ -135,7 +150,10 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K FileUtils.closeQuietly(in); } } - return keys; + if (logger.isDebugEnabled()) + logger.debug(String.format("completed reading (%d ms; %d keys) saved cache %s", + System.currentTimeMillis() - start, count, path)); + return count; } public Future<?> submitWrite(int keysToSave) @@ -156,22 +174,11 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K } } - public int estimateSizeToSave(Set<K> keys) - { - int bytes = 0; - - for (K key : keys) - bytes += key.serializedSize(); - - return bytes; - } - public class Writer extends CompactionInfo.Holder { private final Set<K> keys; private final CompactionInfo info; - private final long estimatedTotalBytes; - private long bytesWritten; + private long keysWritten; protected Writer(int keysToSave) { @@ -180,9 +187,6 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K else keys = hotKeySet(keysToSave); - // an approximation -- the keyset can change while saving - estimatedTotalBytes = estimateSizeToSave(keys); - OperationType type; if (cacheType == CacheService.CacheType.KEY_CACHE) type = OperationType.KEY_CACHE_SAVE; @@ -191,15 +195,13 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K else type = OperationType.UNKNOWN; - info = new CompactionInfo(type, 0, estimatedTotalBytes); + info = new CompactionInfo(type, 0, keys.size(), "keys"); } public CompactionInfo getCompactionInfo() { - long bytesWritten = this.bytesWritten; - // keyset can change in size, thus totalBytes can too - return info.forProgress(bytesWritten, - Math.max(bytesWritten, estimatedTotalBytes)); + // keyset can change in size, thus total can too + return info.forProgress(keysWritten, Math.max(keysWritten, keys.size())); } public void saveCache() throws IOException @@ -207,7 +209,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K logger.debug("Deleting old {} files.", cacheType); deleteOldCacheFiles(); - if (keys.size() == 0 || estimatedTotalBytes == 0) + if (keys.size() == 0 || keys.size() == 0) { logger.debug("Skipping {} save, cache is empty.", cacheType); return; @@ -219,7 +221,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K try { - for (CacheKey key : keys) + for (K key : keys) { Pair<String, String> path = key.getPathInfo(); SequentialWriter writer = writers.get(path); @@ -229,9 +231,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K writer = tempCacheFile(path); writers.put(path, writer); } - - key.write(writer.stream); - bytesWritten += key.serializedSize(); + cacheLoader.serialize(key, writer.stream); + keysWritten++; } } finally @@ -246,7 +247,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K SequentialWriter writer = info.getValue(); File tmpFile = new File(writer.getPath()); - File cacheFile = getCachePath(path.left, path.right); + File cacheFile = getCachePath(path.left, path.right, CURRENT_VERSION); cacheFile.delete(); // ignore error if it didn't exist if (!tmpFile.renameTo(cacheFile)) @@ -258,13 +259,12 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K private SequentialWriter tempCacheFile(Pair<String, String> pathInfo) throws IOException { - File path = getCachePath(pathInfo.left, pathInfo.right); + File path = getCachePath(pathInfo.left, pathInfo.right, CURRENT_VERSION); File tmpFile = File.createTempFile(path.getName(), null, path.getParentFile()); return SequentialWriter.open(tmpFile, true); } - private void deleteOldCacheFiles() { File savedCachesDir = new File(DatabaseDescriptor.getSavedCachesLocation()); @@ -278,8 +278,24 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K if (!file.delete()) logger.warn("Failed to delete {}", file.getAbsolutePath()); } + + if (file.isFile() && file.getName().endsWith(CURRENT_VERSION + ".db")) + { + if (!file.delete()) + logger.warn("Failed to delete {}", file.getAbsolutePath()); + } } } } } + + public interface CacheSerializer<K extends CacheKey, V> + { + void serialize(K key, DataOutput out) throws IOException; + + Pair<K, V> deserialize(DataInputStream in, ColumnFamilyStore store) throws IOException; + + @Deprecated + void load(Set<ByteBuffer> buffer, ColumnFamilyStore store); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/cache/CacheKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/CacheKey.java b/src/java/org/apache/cassandra/cache/CacheKey.java index 08b2fcf..5743dfc 100644 --- a/src/java/org/apache/cassandra/cache/CacheKey.java +++ b/src/java/org/apache/cassandra/cache/CacheKey.java @@ -17,24 +17,11 @@ */ package org.apache.cassandra.cache; -import java.io.DataOutputStream; -import java.io.IOException; - import org.apache.cassandra.utils.Pair; public interface CacheKey { /** - * @return Serialized part of the key which should be persisted - */ - public void write(DataOutputStream out) throws IOException; - - /** - * @return The size of the serialized key - */ - public int serializedSize(); - - /** * @return The keyspace and ColumnFamily names to which this key belongs */ public Pair<String, String> getPathInfo(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/cache/KeyCacheKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/KeyCacheKey.java b/src/java/org/apache/cassandra/cache/KeyCacheKey.java index ce83726..bee88e8 100644 --- a/src/java/org/apache/cassandra/cache/KeyCacheKey.java +++ b/src/java/org/apache/cassandra/cache/KeyCacheKey.java @@ -17,21 +17,18 @@ */ package org.apache.cassandra.cache; -import java.io.DataOutputStream; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.Arrays; -import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; public class KeyCacheKey implements CacheKey { - private final Descriptor desc; - private final byte[] key; + public final Descriptor desc; + public final byte[] key; public KeyCacheKey(Descriptor desc, ByteBuffer key) { @@ -40,21 +37,11 @@ public class KeyCacheKey implements CacheKey assert this.key != null; } - public void write(DataOutputStream out) throws IOException - { - ByteBufferUtil.writeWithLength(key, out); - } - public Pair<String, String> getPathInfo() { return new Pair<String, String>(desc.ksname, desc.cfname); } - public int serializedSize() - { - return TypeSizes.NATIVE.sizeof(key.length) + key.length; - } - public String toString() { try http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/cache/RowCacheKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/RowCacheKey.java b/src/java/org/apache/cassandra/cache/RowCacheKey.java index 009483c..10a80b6 100644 --- a/src/java/org/apache/cassandra/cache/RowCacheKey.java +++ b/src/java/org/apache/cassandra/cache/RowCacheKey.java @@ -17,14 +17,11 @@ */ package org.apache.cassandra.cache; -import java.io.DataOutputStream; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.UUID; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -47,21 +44,11 @@ public class RowCacheKey implements CacheKey, Comparable<RowCacheKey> assert this.key != null; } - public void write(DataOutputStream out) throws IOException - { - ByteBufferUtil.writeWithLength(key, out); - } - public Pair<String, String> getPathInfo() { return Schema.instance.getCF(cfId); } - public int serializedSize() - { - return key.length + TypeSizes.NATIVE.sizeof(key.length); - } - @Override public boolean equals(Object o) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index bb48ff7..06344cc 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -901,9 +901,9 @@ public class DatabaseDescriptor return conf.index_interval; } - public static File getSerializedCachePath(String ksName, String cfName, CacheService.CacheType cacheType) + public static File getSerializedCachePath(String ksName, String cfName, CacheService.CacheType cacheType, String version) { - return new File(conf.saved_caches_directory + File.separator + ksName + "-" + cfName + "-" + cacheType); + return new File(conf.saved_caches_directory + File.separator + ksName + "-" + cfName + "-" + cacheType + ((version != null) ? "-" + version + ".db" : "")); } public static int getDynamicUpdateInterval() http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 2891585..11f9ec9 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -34,7 +34,6 @@ import com.google.common.util.concurrent.Futures; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.cache.IRowCacheEntry; import org.apache.cassandra.cache.RowCacheKey; import org.apache.cassandra.cache.RowCacheSentinel; @@ -223,12 +222,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // scan for sstables corresponding to this cf and load them data = new DataTracker(this); - Set<DecoratedKey> savedKeys = caching == Caching.NONE || caching == Caching.ROWS_ONLY - ? Collections.<DecoratedKey>emptySet() - : CacheService.instance.keyCache.readSaved(table.name, columnFamily); - Directories.SSTableLister sstables = directories.sstableLister().skipCompacted(true).skipTemporary(true); - data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), savedKeys, data, metadata, this.partitioner)); + data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), data, metadata, this.partitioner)); + if (caching != Caching.NONE || caching != Caching.ROWS_ONLY) + CacheService.instance.keyCache.loadSaved(this); // compaction strategy should be created after the CFS has been prepared this.compactionStrategy = metadata.createCompactionStrategyInstance(this); @@ -405,19 +402,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean long start = System.currentTimeMillis(); - AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = CacheService.instance.rowCache; - - // results are sorted on read (via treeset) because there are few reads and many writes and reads only happen at startup - int cachedRowsRead = 0; - for (DecoratedKey key : rowCache.readSaved(table.name, columnFamily)) - { - ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(key, new QueryPath(columnFamily)), - Integer.MIN_VALUE, - true); - CacheService.instance.rowCache.put(new RowCacheKey(metadata.cfId, key), data); - cachedRowsRead++; - } - + int cachedRowsRead = CacheService.instance.rowCache.loadSaved(this); if (cachedRowsRead > 0) logger.info(String.format("completed loading (%d ms; %d keys) row cache for %s.%s", System.currentTimeMillis() - start, @@ -478,7 +463,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean SSTableReader reader; try { - reader = SSTableReader.open(newDescriptor, entry.getValue(), Collections.<DecoratedKey>emptySet(), data, metadata, partitioner); + reader = SSTableReader.open(newDescriptor, entry.getValue(), data, metadata, partitioner); } catch (IOException e) { @@ -1294,7 +1279,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return new ViewFragment(sstables, Iterables.concat(Collections.singleton(view.memtable), view.memtablesPendingFlush)); } - private ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore, boolean forCache) + public ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore, boolean forCache) { CollationController controller = new CollationController(this, forCache, filter, gcBefore); ColumnFamily columns = controller.getTopLevelColumns(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/db/RowIndexEntry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java index 8ec1ab3..0c50746 100644 --- a/src/java/org/apache/cassandra/db/RowIndexEntry.java +++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java @@ -133,9 +133,12 @@ public class RowIndexEntry public void skip(DataInput dis, Descriptor.Version version) throws IOException { dis.readLong(); - if (!version.hasPromotedIndexes) - return; + if (version.hasPromotedIndexes) + skipPromotedIndex(dis); + } + public void skipPromotedIndex(DataInput dis) throws IOException + { int size = dis.readInt(); if (size <= 0) return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java index cc46c31..d0f7efb 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java @@ -32,8 +32,9 @@ public final class CompactionInfo implements Serializable private static final long serialVersionUID = 3695381572726744816L; private final CFMetaData cfm; private final OperationType tasktype; - private final long bytesComplete; - private final long totalBytes; + private final long completed; + private final long total; + private final String unit; public CompactionInfo(OperationType tasktype, long bytesComplete, long totalBytes) { @@ -42,16 +43,27 @@ public final class CompactionInfo implements Serializable public CompactionInfo(UUID id, OperationType tasktype, long bytesComplete, long totalBytes) { + this(id, tasktype, bytesComplete, totalBytes, "bytes"); + } + + public CompactionInfo(OperationType tasktype, long completed, long total, String unit) + { + this(null, tasktype, completed, total, unit); + } + + public CompactionInfo(UUID id, OperationType tasktype, long completed, long total, String unit) + { this.tasktype = tasktype; - this.bytesComplete = bytesComplete; - this.totalBytes = totalBytes; + this.completed = completed; + this.total = total; this.cfm = id == null ? null : Schema.instance.getCFMetaData(id); + this.unit = unit; } /** @return A copy of this CompactionInfo with updated progress. */ public CompactionInfo forProgress(long bytesComplete, long totalBytes) { - return new CompactionInfo(cfm == null ? null : cfm.cfId, tasktype, bytesComplete, totalBytes); + return new CompactionInfo(cfm == null ? null : cfm.cfId, tasktype, bytesComplete, totalBytes, unit); } public UUID getId() @@ -74,14 +86,14 @@ public final class CompactionInfo implements Serializable return cfm; } - public long getBytesComplete() + public long getCompleted() { - return bytesComplete; + return completed; } - public long getTotalBytes() + public long getTotal() { - return totalBytes; + return total; } public OperationType getTaskType() @@ -94,19 +106,20 @@ public final class CompactionInfo implements Serializable StringBuilder buff = new StringBuilder(); buff.append(getTaskType()).append('@').append(getId()); buff.append('(').append(getKeyspace()).append(", ").append(getColumnFamily()); - buff.append(", ").append(getBytesComplete()).append('/').append(getTotalBytes()); - return buff.append(')').toString(); + buff.append(", ").append(getCompleted()).append('/').append(getTotal()); + return buff.append(')').append(unit).toString(); } public Map<String, String> asMap() { Map<String, String> ret = new HashMap<String, String>(); - ret.put("id", getId().toString()); + ret.put("id", getId() == null ? "" : getId().toString()); ret.put("keyspace", getKeyspace()); ret.put("columnfamily", getColumnFamily()); - ret.put("bytesComplete", Long.toString(bytesComplete)); - ret.put("totalBytes", Long.toString(totalBytes)); + ret.put("completed", Long.toString(completed)); + ret.put("total", Long.toString(total)); ret.put("taskType", tasktype.toString()); + ret.put("unit", unit); return ret; } @@ -131,7 +144,7 @@ public final class CompactionInfo implements Serializable */ public void started() { - reportedSeverity = StorageService.instance.reportSeverity(getCompactionInfo().getTotalBytes()/load); + reportedSeverity = StorageService.instance.reportSeverity(getCompactionInfo().getTotal()/load); } /** @@ -140,7 +153,7 @@ public final class CompactionInfo implements Serializable public void finished() { if (reportedSeverity) - StorageService.instance.reportSeverity(-(getCompactionInfo().getTotalBytes()/load)); + StorageService.instance.reportSeverity(-(getCompactionInfo().getTotal()/load)); reportedSeverity = false; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 68c23fd..5f530a2 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1052,7 +1052,7 @@ public class CompactionManager implements CompactionManagerMBean // notify ci.finished(); compactions.remove(ci); - totalBytesCompacted += ci.getCompactionInfo().getTotalBytes(); + totalBytesCompacted += ci.getCompactionInfo().getTotal(); totalCompactionsCompleted += 1; } @@ -1065,7 +1065,7 @@ public class CompactionManager implements CompactionManagerMBean { long bytesCompletedInProgress = 0L; for (CompactionInfo.Holder ci : compactions) - bytesCompletedInProgress += ci.getCompactionInfo().getBytesComplete(); + bytesCompletedInProgress += ci.getCompactionInfo().getCompleted(); return bytesCompletedInProgress + totalBytesCompacted; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 895cd59..9e576dc 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -138,10 +138,10 @@ public class SSTableReader extends SSTable public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException { - return open(descriptor, components, Collections.<DecoratedKey>emptySet(), null, metadata, partitioner); + return open(descriptor, components, null, metadata, partitioner); } - public static SSTableReader open(Descriptor descriptor, Set<Component> components, Set<DecoratedKey> savedKeys, DataTracker tracker, CFMetaData metadata, IPartitioner partitioner) throws IOException + public static SSTableReader open(Descriptor descriptor, Set<Component> components, DataTracker tracker, CFMetaData metadata, IPartitioner partitioner) throws IOException { assert partitioner != null; // Minimum components without which we can't do anything @@ -178,11 +178,11 @@ public class SSTableReader extends SSTable // versions before 'c' encoded keys as utf-16 before hashing to the filter if (descriptor.version.hasStringsInBloomFilter) { - sstable.load(true, savedKeys); + sstable.load(true); } else { - sstable.load(false, savedKeys); + sstable.load(false); sstable.loadBloomFilter(); } if (logger.isDebugEnabled()) @@ -203,7 +203,6 @@ public class SSTableReader extends SSTable } public static Collection<SSTableReader> batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries, - final Set<DecoratedKey> savedKeys, final DataTracker tracker, final CFMetaData metadata, final IPartitioner partitioner) @@ -220,7 +219,7 @@ public class SSTableReader extends SSTable SSTableReader sstable; try { - sstable = open(entry.getKey(), entry.getValue(), savedKeys, tracker, metadata, partitioner); + sstable = open(entry.getKey(), entry.getValue(), tracker, metadata, partitioner); } catch (IOException ex) { @@ -328,10 +327,8 @@ public class SSTableReader extends SSTable /** * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter. */ - private void load(boolean recreatebloom, Set<DecoratedKey> keysToLoadInCache) throws IOException + private void load(boolean recreatebloom) throws IOException { - boolean cacheLoading = keyCache != null && !keysToLoadInCache.isEmpty(); - SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode()); SegmentedFile.Builder dbuilder = compression ? SegmentedFile.getCompressedBuilder() @@ -343,7 +340,7 @@ public class SSTableReader extends SSTable // try to load summaries from the disk and check if we need // to read primary index because we should re-create a BloomFilter or pre-load KeyCache final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder); - final boolean readIndex = recreatebloom || cacheLoading || !summaryLoaded; + final boolean readIndex = recreatebloom || !summaryLoaded; try { long indexSize = primaryIndex.length(); @@ -369,8 +366,6 @@ public class SSTableReader extends SSTable if (recreatebloom) bf.add(decoratedKey.key); - if (cacheLoading && keysToLoadInCache.contains(decoratedKey)) - cacheKey(decoratedKey, indexEntry); // if summary was already read from disk we don't want to re-populate it using primary index if (!summaryLoaded) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index 3a2d8e9..e66d995 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -17,9 +17,15 @@ */ package org.apache.cassandra.service; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; import java.lang.management.ManagementFactory; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -27,10 +33,21 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.cassandra.cache.*; +import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.SSTableReader.Operator; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,7 +118,7 @@ public class CacheService implements CacheServiceMBean // as values are constant size we can use singleton weigher // where 48 = 40 bytes (average size of the key) + 8 bytes (size of value) ICache<KeyCacheKey, RowIndexEntry> kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity / AVERAGE_KEY_CACHE_ROW_SIZE); - AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = new AutoSavingCache<KeyCacheKey, RowIndexEntry>(kc, CacheType.KEY_CACHE); + AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = new AutoSavingCache<KeyCacheKey, RowIndexEntry>(kc, CacheType.KEY_CACHE, new KeyCacheSerializer()); int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave(); @@ -127,7 +144,7 @@ public class CacheService implements CacheServiceMBean // cache object ICache<RowCacheKey, IRowCacheEntry> rc = DatabaseDescriptor.getRowCacheProvider().create(rowCacheInMemoryCapacity, true); - AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new AutoSavingCache<RowCacheKey, IRowCacheEntry>(rc, CacheType.ROW_CACHE); + AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new AutoSavingCache<RowCacheKey, IRowCacheEntry>(rc, CacheType.ROW_CACHE, new RowCacheSerializer()); int rowCacheKeysToSave = DatabaseDescriptor.getRowCacheKeysToSave(); @@ -281,4 +298,91 @@ public class CacheService implements CacheServiceMBean FBUtilities.waitOnFutures(futures); logger.debug("cache saves completed"); } + + public class RowCacheSerializer implements CacheSerializer<RowCacheKey, IRowCacheEntry> + { + public void serialize(RowCacheKey key, DataOutput out) throws IOException + { + ByteBufferUtil.writeWithLength(key.key, out); + } + + public Pair<RowCacheKey, IRowCacheEntry> deserialize(DataInputStream in, ColumnFamilyStore store) throws IOException + { + ByteBuffer buffer = ByteBufferUtil.readWithLength(in); + DecoratedKey key = StorageService.getPartitioner().decorateKey(buffer); + ColumnFamily data = store.getTopLevelColumns(QueryFilter.getIdentityFilter(key, new QueryPath(store.columnFamily)), Integer.MIN_VALUE, true); + return new Pair<RowCacheKey, IRowCacheEntry>(new RowCacheKey(store.metadata.cfId, key), data); + } + + @Override + public void load(Set<ByteBuffer> buffers, ColumnFamilyStore store) + { + for (ByteBuffer key : buffers) + { + DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + ColumnFamily data = store.getTopLevelColumns(QueryFilter.getIdentityFilter(dk, new QueryPath(store.columnFamily)), Integer.MIN_VALUE, true); + rowCache.put(new RowCacheKey(store.metadata.cfId, dk), data); + } + } + } + + public class KeyCacheSerializer implements CacheSerializer<KeyCacheKey, RowIndexEntry> + { + public void serialize(KeyCacheKey key, DataOutput out) throws IOException + { + RowIndexEntry entry = CacheService.instance.keyCache.get(key); + if (entry == null) + return; + ByteBufferUtil.writeWithLength(key.key, out); + Descriptor desc = key.desc; + out.writeInt(desc.generation); + out.writeBoolean(desc.version.hasPromotedIndexes); + if (!desc.version.hasPromotedIndexes) + return; + RowIndexEntry.serializer.serialize(entry, out); + } + + public Pair<KeyCacheKey, RowIndexEntry> deserialize(DataInputStream input, ColumnFamilyStore store) throws IOException + { + ByteBuffer key = ByteBufferUtil.readWithLength(input); + int generation = input.readInt(); + SSTableReader reader = findDesc(generation, store.getSSTables()); + if (reader == null) + { + RowIndexEntry.serializer.skipPromotedIndex(input); + return null; + } + RowIndexEntry entry; + if (input.readBoolean()) + entry = RowIndexEntry.serializer.deserialize(input, reader.descriptor.version); + else + entry = reader.getPosition(StorageService.getPartitioner().decorateKey(key), Operator.EQ); + return new Pair<KeyCacheKey, RowIndexEntry>(new KeyCacheKey(reader.descriptor, key), entry); + } + + private SSTableReader findDesc(int generation, Collection<SSTableReader> collection) + { + for (SSTableReader sstable : collection) + { + if (sstable.descriptor.generation == generation) + return sstable; + } + return null; + } + + @Override + public void load(Set<ByteBuffer> buffers, ColumnFamilyStore store) + { + for (ByteBuffer key : buffers) + { + DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + for (SSTableReader sstable : store.getSSTables()) + { + RowIndexEntry entry = sstable.getPosition(dk, Operator.EQ); + if (entry != null) + keyCache.put(new KeyCacheKey(sstable.descriptor, key), entry); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index 85e2c45..3d9ee29 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -472,16 +472,16 @@ public class NodeCmd CompactionManagerMBean cm = probe.getCompactionManagerProxy(); outs.println("pending tasks: " + cm.getPendingTasks()); if (cm.getCompactions().size() > 0) - outs.printf("%25s%16s%16s%16s%16s%10s%n", "compaction type", "keyspace", "column family", "bytes compacted", "bytes total", "progress"); + outs.printf("%25s%16s%16s%16s%16s%10s%10s%n", "compaction type", "keyspace", "column family", "completed", "total", "unit", "progress"); long remainingBytes = 0; for (Map<String, String> c : cm.getCompactions()) { - String percentComplete = new Long(c.get("totalBytes")) == 0 + String percentComplete = new Long(c.get("total")) == 0 ? "n/a" - : new DecimalFormat("0.00").format((double) new Long(c.get("bytesComplete")) / new Long(c.get("totalBytes")) * 100) + "%"; - outs.printf("%25s%16s%16s%16s%16s%10s%n", c.get("taskType"), c.get("keyspace"), c.get("columnfamily"), c.get("bytesComplete"), c.get("totalBytes"), percentComplete); + : new DecimalFormat("0.00").format((double) new Long(c.get("completed")) / new Long(c.get("total")) * 100) + "%"; + outs.printf("%25s%16s%16s%16s%16s%10s%10s%n", c.get("taskType"), c.get("keyspace"), c.get("columnfamily"), c.get("completed"), c.get("total"), c.get("unit"), percentComplete); if (c.get("taskType").equals(OperationType.COMPACTION.toString())) - remainingBytes += (new Long(c.get("totalBytes")) - new Long(c.get("bytesComplete"))); + remainingBytes += (new Long(c.get("total")) - new Long(c.get("completed"))); } long remainingTimeInSecs = compactionThroughput == 0 || remainingBytes == 0 ? -1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/cfe585c2/test/unit/org/apache/cassandra/db/RowCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java index e6b4578..99b8cbc 100644 --- a/test/unit/org/apache/cassandra/db/RowCacheTest.java +++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java @@ -153,6 +153,6 @@ public class RowCacheTest extends SchemaLoader // empty the cache again to make sure values came from disk CacheService.instance.invalidateRowCache(); assert CacheService.instance.rowCache.size() == 0; - assert CacheService.instance.rowCache.readSaved(KEYSPACE, COLUMN_FAMILY).size() == (keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave); + assert CacheService.instance.rowCache.loadSaved(store) == (keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave); } }
