Updated Branches: refs/heads/trunk e1ee63602 -> 08b309191
parallelize row cache loading patch by jbellis; reviewed by vijay for CASSANDRA-4282 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/08b30919 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/08b30919 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/08b30919 Branch: refs/heads/trunk Commit: 08b3091914e99cc3c4ebded66184bb0882e5dd8a Parents: e1ee636 Author: Jonathan Ellis <[email protected]> Authored: Tue Aug 21 20:33:29 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Mon Aug 27 17:19:08 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cache/AutoSavingCache.java | 15 +++++++-- .../cassandra/config/DatabaseDescriptor.java | 2 +- .../org/apache/cassandra/service/CacheService.java | 25 ++++++++++---- 4 files changed, 32 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/08b30919/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2a10d98..ffd9485 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-dev + * parallelize row cache loading (CASSANDRA-4282) * Make compaction, flush JBOD-aware (CASSANDRA-4292) * run local range scans on the read stage (CASSANDRA-3687) * clean up ioexceptions (CASSANDRA-2116) http://git-wip-us.apache.org/repos/asf/cassandra/blob/08b30919/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 7ce2beb..2e86672 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -100,6 +100,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K { int count = 0; long start = System.currentTimeMillis(); + + // old cache format that only saves keys File path = getCachePath(cfs.table.name, cfs.columnFamily, null); if (path.exists()) { @@ -127,6 +129,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K } } + // modern format, allows both key and value (so key cache load can be purely sequential) path = getCachePath(cfs.table.name, cfs.columnFamily, CURRENT_VERSION); if (path.exists()) { @@ -135,14 +138,20 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K { logger.info(String.format("reading saved cache %s", path)); in = new DataInputStream(new BufferedInputStream(new FileInputStream(path))); + List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>(); while (in.available() > 0) { - Pair<K, V> entry = cacheLoader.deserialize(in, cfs); + futures.add(cacheLoader.deserialize(in, cfs)); + count++; + } + + for (Future<Pair<K, V>> future : futures) + { + Pair<K, V> entry = future.get(); // Key cache entry can return null, if the SSTable doesn't exist. if (entry == null) continue; put(entry.left, entry.right); - count++; } } catch (Exception e) @@ -314,7 +323,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K { void serialize(K key, DataOutput out) throws IOException; - Pair<K, V> deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException; + Future<Pair<K, V>> deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException; @Deprecated void load(Set<ByteBuffer> buffer, ColumnFamilyStore cfs); http://git-wip-us.apache.org/repos/asf/cassandra/blob/08b30919/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 1c389b4..0cb1d1b 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1044,7 +1044,7 @@ public class DatabaseDescriptor public static File getSerializedCachePath(String ksName, String cfName, CacheService.CacheType cacheType, String version) { - return new File(conf.saved_caches_directory + File.separator + ksName + "-" + cfName + "-" + cacheType + ((version != null) ? "-" + version + ".db" : "")); + return new File(conf.saved_caches_directory + File.separator + ksName + "-" + cfName + "-" + cacheType + (version == null ? "" : "-" + version + ".db")); } public static int getDynamicUpdateInterval() http://git-wip-us.apache.org/repos/asf/cassandra/blob/08b30919/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index 8446b8d..25a38ef 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -26,14 +26,19 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import javax.management.MBeanServer; import javax.management.ObjectName; +import com.google.common.util.concurrent.Futures; + import org.apache.cassandra.cache.*; import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ColumnFamilyStore; @@ -320,12 +325,18 @@ public class CacheService implements CacheServiceMBean ByteBufferUtil.writeWithLength(key.key, out); } - public Pair<RowCacheKey, IRowCacheEntry> deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException + public Future<Pair<RowCacheKey, IRowCacheEntry>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException { - ByteBuffer buffer = ByteBufferUtil.readWithLength(in); - DecoratedKey key = cfs.partitioner.decorateKey(buffer); - ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, new QueryPath(cfs.columnFamily)), Integer.MIN_VALUE, true); - return new Pair<RowCacheKey, IRowCacheEntry>(new RowCacheKey(cfs.metadata.cfId, key), data); + final ByteBuffer buffer = ByteBufferUtil.readWithLength(in); + return StageManager.getStage(Stage.READ).submit(new Callable<Pair<RowCacheKey, IRowCacheEntry>>() + { + public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception + { + DecoratedKey key = cfs.partitioner.decorateKey(buffer); + ColumnFamily data = cfs.getTopLevelColumns(QueryFilter.getIdentityFilter(key, new QueryPath(cfs.columnFamily)), Integer.MIN_VALUE, true); + return new Pair<RowCacheKey, IRowCacheEntry>(new RowCacheKey(cfs.metadata.cfId, key), data); + } + }); } public void load(Set<ByteBuffer> buffers, ColumnFamilyStore cfs) @@ -355,7 +366,7 @@ public class CacheService implements CacheServiceMBean RowIndexEntry.serializer.serialize(entry, out); } - public Pair<KeyCacheKey, RowIndexEntry> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException + public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException { ByteBuffer key = ByteBufferUtil.readWithLength(input); int generation = input.readInt(); @@ -370,7 +381,7 @@ public class CacheService implements CacheServiceMBean entry = RowIndexEntry.serializer.deserialize(input, reader.descriptor.version); else entry = reader.getPosition(reader.partitioner.decorateKey(key), Operator.EQ); - return new Pair<KeyCacheKey, RowIndexEntry>(new KeyCacheKey(reader.descriptor, key), entry); + return Futures.immediateFuture(Pair.create(new KeyCacheKey(reader.descriptor, key), entry)); } private SSTableReader findDesc(int generation, Collection<SSTableReader> collection)
