Fix race between writes and read for cache patch by jbellis and slebresne; reviewed by jbellis and slebresne for CASSANDRA-3862
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9270f4e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9270f4e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9270f4e Branch: refs/heads/trunk Commit: c9270f4e3ae5f94d46070f1c7e585c90bc68df7c Parents: aa75168 Author: Sylvain Lebresne <[email protected]> Authored: Tue Feb 28 18:53:32 2012 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Tue Feb 28 18:53:32 2012 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/cache/ConcurrentLinkedHashCache.java | 10 ++ .../cache/ConcurrentLinkedHashCacheProvider.java | 13 +- src/java/org/apache/cassandra/cache/ICache.java | 4 + .../org/apache/cassandra/cache/IRowCacheEntry.java | 5 + .../apache/cassandra/cache/IRowCacheProvider.java | 2 +- .../apache/cassandra/cache/InstrumentingCache.java | 10 ++ .../apache/cassandra/cache/RowCacheSentinel.java | 45 ++++++ .../apache/cassandra/cache/SerializingCache.java | 35 ++++- .../cassandra/cache/SerializingCacheProvider.java | 47 ++++++- src/java/org/apache/cassandra/db/ColumnFamily.java | 9 +- .../org/apache/cassandra/db/ColumnFamilyStore.java | 119 +++++++++++---- .../apache/cassandra/db/RowIteratorFactory.java | 2 + .../db/compaction/CompactionIterable.java | 2 +- .../db/compaction/ParallelCompactionIterable.java | 2 +- .../org/apache/cassandra/service/CacheService.java | 8 +- .../cassandra/streaming/IncomingStreamReader.java | 3 +- .../org/apache/cassandra/utils/StatusLogger.java | 3 +- .../unit/org/apache/cassandra/db/RowCacheTest.java | 4 +- 19 files changed, 264 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 41316eb..b5b79a9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -14,6 +14,7 @@ * ignore deprecated KsDef/CfDef/ColumnDef fields in native schema (CASSANDRA-3963) * CLI to report when unsupported column_metadata pair was given (CASSANDRA-3959) * reincarnate removed and deprecated KsDef/CfDef attributes (CASSANDRA-3953) + * Fix race between writes and read for cache (CASSANDRA-3862) Merged from 1.0: * remove the wait on hint future during write (CASSANDRA-3870) * (cqlsh) ignore missing CfDef opts (CASSANDRA-3933) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java index 8f4d2f0..a1cf4ea 100644 --- a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java +++ b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java @@ -117,6 +117,16 @@ public class ConcurrentLinkedHashCache<K, V> implements ICache<K, V> map.put(key, value); } + public boolean putIfAbsent(K key, V value) + { + return map.putIfAbsent(key, value) == null; + } + + public boolean replace(K key, V old, V value) + { + return map.replace(key, old, value); + } + public void remove(K key) { map.remove(key); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java index 851d4c5..71babd6 100644 --- a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java +++ b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCacheProvider.java @@ -20,8 +20,6 @@ package org.apache.cassandra.cache; * */ -import org.apache.cassandra.db.ColumnFamily; - import com.googlecode.concurrentlinkedhashmap.Weigher; import com.googlecode.concurrentlinkedhashmap.Weighers; @@ -29,21 +27,20 @@ import org.github.jamm.MemoryMeter; public class ConcurrentLinkedHashCacheProvider implements IRowCacheProvider { - public ICache<RowCacheKey, ColumnFamily> create(int capacity, boolean useMemoryWeigher) + public ICache<RowCacheKey, IRowCacheEntry> create(int capacity, boolean useMemoryWeigher) { return ConcurrentLinkedHashCache.create(capacity, useMemoryWeigher ? createMemoryWeigher() - : Weighers.<ColumnFamily>singleton()); + : Weighers.<IRowCacheEntry>singleton()); } - private static Weigher<ColumnFamily> createMemoryWeigher() + private static Weigher<IRowCacheEntry> createMemoryWeigher() { - return new Weigher<ColumnFamily>() + return new Weigher<IRowCacheEntry>() { final MemoryMeter meter = new MemoryMeter(); - @Override - public int weightOf(ColumnFamily value) + public int weightOf(IRowCacheEntry value) { return (int) Math.min(meter.measure(value), Integer.MAX_VALUE); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/ICache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/ICache.java b/src/java/org/apache/cassandra/cache/ICache.java index 48e045c..5f8e00b 100644 --- a/src/java/org/apache/cassandra/cache/ICache.java +++ b/src/java/org/apache/cassandra/cache/ICache.java @@ -36,6 +36,10 @@ public interface ICache<K, V> public void put(K key, V value); + public boolean putIfAbsent(K key, V value); + + public boolean replace(K key, V old, V value); + public V get(K key); public void remove(K key); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/IRowCacheEntry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/IRowCacheEntry.java b/src/java/org/apache/cassandra/cache/IRowCacheEntry.java new file mode 100644 index 0000000..7340428 --- /dev/null +++ b/src/java/org/apache/cassandra/cache/IRowCacheEntry.java @@ -0,0 +1,5 @@ +package org.apache.cassandra.cache; + +public interface IRowCacheEntry +{ +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/IRowCacheProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/IRowCacheProvider.java b/src/java/org/apache/cassandra/cache/IRowCacheProvider.java index 9209ced..9e1eb7c 100644 --- a/src/java/org/apache/cassandra/cache/IRowCacheProvider.java +++ b/src/java/org/apache/cassandra/cache/IRowCacheProvider.java @@ -27,5 +27,5 @@ import org.apache.cassandra.db.ColumnFamily; */ public interface IRowCacheProvider { - public ICache<RowCacheKey, ColumnFamily> create(int capacity, boolean useMemoryWeigher); + public ICache<RowCacheKey, IRowCacheEntry> create(int capacity, boolean useMemoryWeigher); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/InstrumentingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/InstrumentingCache.java b/src/java/org/apache/cassandra/cache/InstrumentingCache.java index 36630ac..b4d048f 100644 --- a/src/java/org/apache/cassandra/cache/InstrumentingCache.java +++ b/src/java/org/apache/cassandra/cache/InstrumentingCache.java @@ -45,6 +45,16 @@ public class InstrumentingCache<K, V> map.put(key, value); } + public boolean putIfAbsent(K key, V value) + { + return map.putIfAbsent(key, value); + } + + public boolean replace(K key, V old, V value) + { + return map.replace(key, old, value); + } + public V get(K key) { V v = map.get(key); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/RowCacheSentinel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/RowCacheSentinel.java b/src/java/org/apache/cassandra/cache/RowCacheSentinel.java new file mode 100644 index 0000000..381160a --- /dev/null +++ b/src/java/org/apache/cassandra/cache/RowCacheSentinel.java @@ -0,0 +1,45 @@ +package org.apache.cassandra.cache; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.base.Objects; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.ArrayBackedSortedColumns; +import org.apache.cassandra.db.ColumnFamily; + +/** + * A sentinel object for row caches. See comments to getThroughCache and CASSANDRA-3862. + */ +public class RowCacheSentinel implements IRowCacheEntry +{ + private static final AtomicLong generator = new AtomicLong(); + + final long sentinelId; + + public RowCacheSentinel() + { + sentinelId = generator.getAndIncrement(); + } + + RowCacheSentinel(long sentinelId) + { + this.sentinelId = sentinelId; + } + + @Override + public boolean equals(Object o) + { + if (!(o instanceof RowCacheSentinel)) return false; + + RowCacheSentinel other = (RowCacheSentinel) o; + return this.sentinelId == other.sentinelId; + } + + @Override + public int hashCode() + { + return Objects.hashCode(sentinelId); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/SerializingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java index b8844cb..4946fb0 100644 --- a/src/java/org/apache/cassandra/cache/SerializingCache.java +++ b/src/java/org/apache/cassandra/cache/SerializingCache.java @@ -76,7 +76,6 @@ public class SerializingCache<K, V> implements ICache<K, V> { return new Weigher<FreeableMemory>() { - @Override public int weightOf(FreeableMemory value) { return (int) Math.min(value.size(), Integer.MAX_VALUE); @@ -182,6 +181,40 @@ public class SerializingCache<K, V> implements ICache<K, V> old.unreference(); } + public boolean putIfAbsent(K key, V value) + { + FreeableMemory mem = serialize(value); + if (mem == null) + return false; // out of memory. never mind. + + FreeableMemory old = map.putIfAbsent(key, mem); + if (old != null) + // the new value was not put, we've uselessly allocated some memory, free it + mem.unreference(); + return old == null; + } + + public boolean replace(K key, V oldToReplace, V value) + { + // if there is no old value in our map, we fail + FreeableMemory old = map.get(key); + if (old == null) + return false; + + // see if the old value matches the one we want to replace + FreeableMemory mem = serialize(value); + if (mem == null) + return false; // out of memory. never mind. + V oldValue = deserialize(old); + boolean success = oldValue.equals(oldToReplace) && map.replace(key, old, mem); + + if (success) + old.unreference(); + else + mem.unreference(); + return success; + } + public void remove(K key) { FreeableMemory mem = map.remove(key); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java index f71684b..3a06d36 100644 --- a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java +++ b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java @@ -20,12 +20,55 @@ package org.apache.cassandra.cache; * */ +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOError; +import java.io.IOException; + import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.DBConstants; +import org.apache.cassandra.io.ISerializer; public class SerializingCacheProvider implements IRowCacheProvider { - public ICache<RowCacheKey, ColumnFamily> create(int capacity, boolean useMemoryWeigher) + public ICache<RowCacheKey, IRowCacheEntry> create(int capacity, boolean useMemoryWeigher) + { + return new SerializingCache<RowCacheKey, IRowCacheEntry>(capacity, useMemoryWeigher, new RowCacheSerializer()); + } + + private static class RowCacheSerializer implements ISerializer<IRowCacheEntry> { - return new SerializingCache<RowCacheKey, ColumnFamily>(capacity, useMemoryWeigher, ColumnFamily.serializer()); + public void serialize(IRowCacheEntry cf, DataOutput out) + { + assert cf != null; // unlike CFS we don't support nulls, since there is no need for that in the cache + try + { + out.writeBoolean(cf instanceof RowCacheSentinel); + if (cf instanceof RowCacheSentinel) + out.writeLong(((RowCacheSentinel) cf).sentinelId); + else + ColumnFamily.serializer.serialize((ColumnFamily) cf, out); + } + catch (IOException e) + { + throw new IOError(e); + } + } + + public IRowCacheEntry deserialize(DataInput in) throws IOException + { + boolean isSentinel = in.readBoolean(); + if (isSentinel) + return new RowCacheSentinel(in.readLong()); + return ColumnFamily.serializer.deserialize(in); + } + + public long serializedSize(IRowCacheEntry cf) + { + return DBConstants.boolSize + + (cf instanceof RowCacheSentinel + ? DBConstants.intSize + DBConstants.longSize + : ColumnFamily.serializer().serializedSize((ColumnFamily) cf)); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java index 9191df6..740a0a6 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@ -25,6 +25,7 @@ import java.security.MessageDigest; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.cassandra.cache.IRowCacheEntry; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.filter.QueryPath; @@ -36,10 +37,10 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.HeapAllocator; -public class ColumnFamily extends AbstractColumnContainer +public class ColumnFamily extends AbstractColumnContainer implements IRowCacheEntry { - /* The column serializer for this Column Family. Create based on config. */ - private static ColumnFamilySerializer serializer = new ColumnFamilySerializer(); + public static final ColumnFamilySerializer serializer = new ColumnFamilySerializer(); + private final CFMetaData cfm; public static ColumnFamilySerializer serializer() @@ -77,7 +78,7 @@ public class ColumnFamily extends AbstractColumnContainer return new ColumnFamily(cfm, factory.create(cfm.comparator, reversedInsertOrder)); } - private ColumnFamily(CFMetaData cfm, ISortedColumns map) + protected ColumnFamily(CFMetaData cfm, ISortedColumns map) { super(map); assert cfm != null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 165b150..a8d1fc8 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -18,7 +18,9 @@ package org.apache.cassandra.db; -import java.io.*; +import java.io.File; +import java.io.IOError; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.util.*; @@ -28,14 +30,17 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import javax.management.*; -import com.google.common.collect.*; - -import org.apache.cassandra.io.compress.CompressionParameters; -import org.apache.cassandra.service.CacheService; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.cache.*; +import org.apache.cassandra.cache.AutoSavingCache; +import org.apache.cassandra.cache.IRowCacheEntry; +import org.apache.cassandra.cache.RowCacheKey; +import org.apache.cassandra.cache.RowCacheSentinel; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.concurrent.StageManager; @@ -54,9 +59,11 @@ import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.*; +import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.IndexExpression; import org.apache.cassandra.utils.*; @@ -386,13 +393,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean long start = System.currentTimeMillis(); - AutoSavingCache<RowCacheKey, ColumnFamily> rowCache = CacheService.instance.rowCache; + AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = CacheService.instance.rowCache; // results are sorted on read (via treeset) because there are few reads and many writes and reads only happen at startup int cachedRowsRead = 0; for (DecoratedKey key : rowCache.readSaved(table.name, columnFamily)) { - cacheRow(metadata.cfId, key); + ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(key, new QueryPath(columnFamily)), + Integer.MIN_VALUE, + true); + CacheService.instance.rowCache.put(new RowCacheKey(metadata.cfId, key), data); } if (cachedRowsRead > 0) @@ -708,15 +718,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean RowCacheKey cacheKey = new RowCacheKey(metadata.cfId, key); + // always invalidate a copying cache value if (CacheService.instance.rowCache.isPutCopying()) { invalidateCachedRow(cacheKey); + return; } - else + + // invalidate a normal cache value if it's a sentinel, so the read will retry (and include the new update) + IRowCacheEntry cachedRow = getCachedRowInternal(cacheKey); + if (cachedRow != null) { - ColumnFamily cachedRow = getRawCachedRow(cacheKey); - if (cachedRow != null) - cachedRow.addAll(columnFamily, HeapAllocator.instance); + if (cachedRow instanceof RowCacheSentinel) + invalidateCachedRow(cacheKey); + else + ((ColumnFamily) cachedRow).addAll(columnFamily, HeapAllocator.instance); } } @@ -1088,30 +1104,52 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return (int) (System.currentTimeMillis() / 1000) - metadata.getGcGraceSeconds(); } - public ColumnFamily cacheRow(Integer cfId, DecoratedKey decoratedKey) + /** + * fetch the row given by filter.key if it is in the cache; if not, read it from disk and cache it + * @param cfId the column family to read the row from + * @param filter the columns being queried. Note that we still cache entire rows, but if a row is uncached + * and we race to cache it, only the winner will read the entire row + * @return the entire row for filter.key, if present in the cache (or we can cache it), or just the column + * specified by filter otherwise + */ + private ColumnFamily getThroughCache(Integer cfId, QueryFilter filter) { assert isRowCacheEnabled() - : String.format("Row cache is not enabled on column family [" + getColumnFamilyName() + "]"); + : String.format("Row cache is not enabled on column family [" + getColumnFamilyName() + "]"); - RowCacheKey key = new RowCacheKey(cfId, decoratedKey); + RowCacheKey key = new RowCacheKey(cfId, filter.key); - ColumnFamily cached; - - if ((cached = CacheService.instance.rowCache.get(key)) == null) + // attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our + // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862 + IRowCacheEntry cached = CacheService.instance.rowCache.get(key); + if (cached != null) { - // We force ThreadSafeSortedColumns because cached row will be accessed concurrently - cached = getTopLevelColumns(QueryFilter.getIdentityFilter(decoratedKey, new QueryPath(columnFamily)), - Integer.MIN_VALUE, - true); + if (cached instanceof RowCacheSentinel) + { + // Some other read is trying to cache the value, just do a normal non-caching read + return getTopLevelColumns(filter, Integer.MIN_VALUE, false); + } + return (ColumnFamily) cached; + } - if (cached == null) - return null; + RowCacheSentinel sentinel = new RowCacheSentinel(); + boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel); - // avoid keeping a permanent reference to the original key buffer - CacheService.instance.rowCache.put(key, cached); - } + try + { + ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, new QueryPath(columnFamily)), + Integer.MIN_VALUE, + true); + if (sentinelSuccess && data != null) + CacheService.instance.rowCache.replace(key, sentinel, data); - return cached; + return data; + } + finally + { + if (sentinelSuccess && data == null) + CacheService.instance.rowCache.remove(key); + } } ColumnFamily getColumnFamily(QueryFilter filter, int gcBefore) @@ -1137,7 +1175,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (cfId == null) return null; // secondary index - ColumnFamily cached = cacheRow(cfId, filter.key); + ColumnFamily cached = getThroughCache(cfId, filter); if (cached == null) return null; @@ -1484,21 +1522,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return data.getSSTables().size(); } - /** raw cached row -- does not fetch the row if it is not present. not counted in cache statistics. */ - + /** + * @return the cached row for @param key if it is already present in the cache. + * That is, unlike getThroughCache, it will not readAndCache the row if it is not present, nor + * are these calls counted in cache statistics. + * + * Note that this WILL cause deserialization of a SerializingCache row, so if all you + * need to know is whether a row is present or not, use containsCachedRow instead. + */ public ColumnFamily getRawCachedRow(DecoratedKey key) { if (metadata.cfId == null) return null; // secondary index - return getRawCachedRow(new RowCacheKey(metadata.cfId, key)); + IRowCacheEntry cached = getCachedRowInternal(new RowCacheKey(metadata.cfId, key)); + return cached == null || cached instanceof RowCacheSentinel ? null : (ColumnFamily) cached; } - public ColumnFamily getRawCachedRow(RowCacheKey key) + private IRowCacheEntry getCachedRowInternal(RowCacheKey key) { return CacheService.instance.rowCache.getCapacity() == 0 ? null : CacheService.instance.rowCache.getInternal(key); } + /** + * @return true if @param key is contained in the row cache + */ + public boolean containsCachedRow(DecoratedKey key) + { + return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.cfId, key)); + } + public void invalidateCachedRow(RowCacheKey key) { CacheService.instance.rowCache.remove(key); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/db/RowIteratorFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java index 34fe07a..a31f6ec 100644 --- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java +++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java @@ -103,8 +103,10 @@ public class RowIteratorFactory // First check if this row is in the rowCache. If it is we can skip the rest ColumnFamily cached = cfs.getRawCachedRow(key); if (cached == null) + { // not cached: collate filter.collateColumns(returnCF, colIters, gcBefore); + } else { QueryFilter keyFilter = new QueryFilter(key, filter.path, filter.filter); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java index 270c3af..2fd0240 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java @@ -110,7 +110,7 @@ public class CompactionIterable extends AbstractCompactionIterable // If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look // like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of // memory on long running instances - controller.removeDeletedInCache(compactedRow.key); + controller.invalidateCachedRow(compactedRow.key); } return compactedRow; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java index dba8f55..9b67676 100644 --- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java @@ -130,7 +130,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable // If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look // like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of // memory on long running instances - controller.removeDeletedInCache(compactedRow.key); + controller.invalidateCachedRow(compactedRow.key); return compactedRow; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index 8dd6bde..c70c45d 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -62,7 +62,7 @@ public class CacheService implements CacheServiceMBean public final static CacheService instance = new CacheService(); public final AutoSavingCache<KeyCacheKey, Long> keyCache; - public final AutoSavingCache<RowCacheKey, ColumnFamily> rowCache; + public final AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache; private int rowCacheSavePeriod; private int keyCacheSavePeriod; @@ -116,7 +116,7 @@ public class CacheService implements CacheServiceMBean /** * @return initialized row cache */ - private AutoSavingCache<RowCacheKey, ColumnFamily> initRowCache() + private AutoSavingCache<RowCacheKey, IRowCacheEntry> initRowCache() { logger.info("Initializing row cache with capacity of {} MBs and provider {}", DatabaseDescriptor.getRowCacheSizeInMB(), @@ -125,8 +125,8 @@ public class CacheService implements CacheServiceMBean int rowCacheInMemoryCapacity = DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024; // cache object - ICache<RowCacheKey, ColumnFamily> rc = DatabaseDescriptor.getRowCacheProvider().create(rowCacheInMemoryCapacity, true); - AutoSavingCache<RowCacheKey, ColumnFamily> rowCache = new AutoSavingCache<RowCacheKey, ColumnFamily>(rc, CacheType.ROW_CACHE); + ICache<RowCacheKey, IRowCacheEntry> rc = DatabaseDescriptor.getRowCacheProvider().create(rowCacheInMemoryCapacity, true); + AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new AutoSavingCache<RowCacheKey, IRowCacheEntry>(rc, CacheType.ROW_CACHE); int rowCacheKeysToSave = DatabaseDescriptor.getRowCacheKeysToSave(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java index f57b400..915d3bc 100644 --- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java @@ -124,8 +124,7 @@ public class IncomingStreamReader key = SSTableReader.decodeKey(StorageService.getPartitioner(), localFile.desc, ByteBufferUtil.readWithShortLength(in)); long dataSize = SSTableReader.readRowSize(in, localFile.desc); - ColumnFamily cached = cfs.getRawCachedRow(key); - if (cached != null && remoteFile.type == OperationType.AES && dataSize <= DatabaseDescriptor.getInMemoryCompactionLimit()) + if (cfs.containsCachedRow(key) && remoteFile.type == OperationType.AES && dataSize <= DatabaseDescriptor.getInMemoryCompactionLimit()) { // need to update row cache // Note: Because we won't just echo the columns, there is no need to use the PRESERVE_SIZE flag, contrarily to what appendFromStream does below http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/src/java/org/apache/cassandra/utils/StatusLogger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java index 9d1ff68..1185315 100644 --- a/src/java/org/apache/cassandra/utils/StatusLogger.java +++ b/src/java/org/apache/cassandra/utils/StatusLogger.java @@ -31,6 +31,7 @@ import javax.management.ObjectName; import com.google.common.collect.Iterables; import org.apache.cassandra.cache.AutoSavingCache; +import org.apache.cassandra.cache.IRowCacheEntry; import org.apache.cassandra.cache.KeyCacheKey; import org.apache.cassandra.cache.RowCacheKey; import org.apache.cassandra.db.ColumnFamily; @@ -90,7 +91,7 @@ public class StatusLogger // Global key/row cache information AutoSavingCache<KeyCacheKey, Long> keyCache = CacheService.instance.keyCache; - AutoSavingCache<RowCacheKey, ColumnFamily> rowCache = CacheService.instance.rowCache; + AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = CacheService.instance.rowCache; int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave(); int rowCacheKeysToSave = DatabaseDescriptor.getRowCacheKeysToSave(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9270f4e/test/unit/org/apache/cassandra/db/RowCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java index 9bce4e8..da5cf63 100644 --- a/test/unit/org/apache/cassandra/db/RowCacheTest.java +++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java @@ -66,7 +66,7 @@ public class RowCacheTest extends CleanupHelper cachedStore.getColumnFamily(key, path, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1); assert CacheService.instance.rowCache.size() == i + 1; - assert cachedStore.getRawCachedRow(key) != null; // current key should be stored in the cache + assert cachedStore.containsCachedRow(key); // current key should be stored in the cache // checking if column is read correctly after cache ColumnFamily cf = cachedStore.getColumnFamily(key, path, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1); @@ -88,7 +88,7 @@ public class RowCacheTest extends CleanupHelper QueryPath path = new QueryPath(COLUMN_FAMILY, null, ByteBufferUtil.bytes("col" + i)); cachedStore.getColumnFamily(key, path, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1); - assert cachedStore.getRawCachedRow(key) != null; // cache should be populated with the latest rows read (old ones should be popped) + assert cachedStore.containsCachedRow(key); // cache should be populated with the latest rows read (old ones should be popped) // checking if column is read correctly after cache ColumnFamily cf = cachedStore.getColumnFamily(key, path, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
