Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0de23f20 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0de23f20 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0de23f20 Branch: refs/heads/trunk Commit: 0de23f20ae4bd95f040017e2db653c6c1b5eabe9 Parents: 9a90e98 e487553 Author: Yuki Morishita <yu...@apache.org> Authored: Wed Nov 11 16:16:23 2015 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed Nov 11 16:16:23 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 34 +++++++++++ .../db/compaction/CompactionController.java | 5 -- src/java/org/apache/cassandra/dht/Bounds.java | 62 ++++++++++++++++++++ .../cassandra/streaming/StreamReader.java | 12 ++-- .../cassandra/streaming/StreamReceiveTask.java | 37 +++++++++++- .../compress/CompressedStreamReader.java | 2 +- .../apache/cassandra/db/CounterCacheTest.java | 48 +++++++++++++++ .../org/apache/cassandra/db/RowCacheTest.java | 50 ++++++++++++++++ .../org/apache/cassandra/dht/BoundsTest.java | 61 +++++++++++++++++++ 10 files changed, 298 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index d271c95,0fcf037..02dc249 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,50 -1,6 +1,51 @@@ -2.2.4 +3.0.1 + * Keep the file open in trySkipCache (CASSANDRA-10669) + * Updated trigger example (CASSANDRA-10257) +Merged from 2.2: * (Hadoop) fix splits calculation (CASSANDRA-10640) * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) +Merged from 2.1: ++ * Invalidate cache after stream receive task is completed (CASSANDRA-10341) + * Reject counter writes in CQLSSTableWriter (CASSANDRA-10258) + * Remove superfluous COUNTER_MUTATION stage mapping (CASSANDRA-10605) + + +3.0 + * Fix AssertionError while flushing memtable due to materialized views + incorrectly inserting empty rows (CASSANDRA-10614) + * Store UDA initcond as CQL literal in the schema table, instead of a blob (CASSANDRA-10650) + * Don't use -1 for the position of partition key in schema (CASSANDRA-10491) + * Fix distinct queries in mixed version cluster (CASSANDRA-10573) + * Skip sstable on clustering in names query (CASSANDRA-10571) + * Remove value skipping as it breaks read-repair (CASSANDRA-10655) + * Fix bootstrapping with MVs (CASSANDRA-10621) + * Make sure EACH_QUORUM reads are using NTS (CASSANDRA-10584) + * Fix MV replica filtering for non-NetworkTopologyStrategy (CASSANDRA-10634) + * (Hadoop) fix CIF describeSplits() not handling 0 size estimates (CASSANDRA-10600) + * Fix reading of legacy sstables (CASSANDRA-10590) + * Use CQL type names in schema metadata tables (CASSANDRA-10365) + * Guard batchlog replay against integer division by zero (CASSANDRA-9223) + * Fix bug when adding a column to thrift with the same name than a primary key (CASSANDRA-10608) + * Add client address argument to IAuthenticator::newSaslNegotiator (CASSANDRA-8068) + * Fix implementation of LegacyLayout.LegacyBoundComparator (CASSANDRA-10602) + * Don't use 'names query' read path for counters (CASSANDRA-10572) + * Fix backward compatibility for counters (CASSANDRA-10470) + * Remove memory_allocator paramter from cassandra.yaml (CASSANDRA-10581,10628) + * Execute the metadata reload task of all registered indexes on CFS::reload (CASSANDRA-10604) + * Fix thrift cas operations with defined columns (CASSANDRA-10576) + * Fix PartitionUpdate.operationCount()for updates with static column operations (CASSANDRA-10606) + * Fix thrift get() queries with defined columns (CASSANDRA-10586) + * Fix marking of indexes as built and removed (CASSANDRA-10601) + * Skip initialization of non-registered 2i instances, remove Index::getIndexName (CASSANDRA-10595) + * Fix batches on multiple tables (CASSANDRA-10554) + * Ensure compaction options are validated when updating KeyspaceMetadata (CASSANDRA-10569) + * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975) + * Remove token generator (CASSANDRA-5261) + * RolesCache should not be created for any authenticator that does not requireAuthentication (CASSANDRA-10562) + * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421) + * Fix handling of range tombstones when reading old format sstables (CASSANDRA-10360) + * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367) +Merged from 2.2: * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) * Use most up-to-date version of schema for system tables (CASSANDRA-10652) * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 0b838bf,2d58219..38c99ea --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -1739,6 -2519,41 +1739,40 @@@ public class ColumnFamilyStore implemen CacheService.instance.invalidateCounterCacheForCf(metadata.ksAndCFName); } + public int invalidateRowCache(Collection<Bounds<Token>> boundsToInvalidate) + { + int invalidatedKeys = 0; + for (Iterator<RowCacheKey> keyIter = CacheService.instance.rowCache.keyIterator(); + keyIter.hasNext(); ) + { + RowCacheKey key = keyIter.next(); - DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key)); ++ DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.key)); + if (key.ksAndCFName.equals(metadata.ksAndCFName) && Bounds.isInBounds(dk.getToken(), boundsToInvalidate)) + { - invalidateCachedRow(dk); ++ invalidateCachedPartition(dk); + invalidatedKeys++; + } + } - + return invalidatedKeys; + } + + public int invalidateCounterCache(Collection<Bounds<Token>> boundsToInvalidate) + { + int invalidatedKeys = 0; + for (Iterator<CounterCacheKey> keyIter = CacheService.instance.counterCache.keyIterator(); + keyIter.hasNext(); ) + { + CounterCacheKey key = keyIter.next(); - DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey)); ++ DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.partitionKey)); + if (key.ksAndCFName.equals(metadata.ksAndCFName) && Bounds.isInBounds(dk.getToken(), boundsToInvalidate)) + { + CacheService.instance.counterCache.remove(key); + invalidatedKeys++; + } + } + return invalidatedKeys; + } + /** * @return true if @param key is contained in the row cache */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/dht/Bounds.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/dht/Bounds.java index d9c189d,73414cd..a125168 --- a/src/java/org/apache/cassandra/dht/Bounds.java +++ b/src/java/org/apache/cassandra/dht/Bounds.java @@@ -17,10 -17,19 +17,19 @@@ */ package org.apache.cassandra.dht; + import java.util.ArrayList; + import java.util.Collection; import java.util.Collections; + import java.util.Comparator; import java.util.List; + import java.util.Set; + + import com.google.common.collect.Iterators; + import com.google.common.collect.Lists; + import com.google.common.collect.PeekingIterator; + import com.google.common.collect.Sets; -import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.utils.Pair; /** @@@ -102,16 -111,20 +111,30 @@@ public class Bounds<T extends RingPosit return "]"; } + public static <T extends RingPosition<T>> boolean isInBounds(T token, Iterable<Bounds<T>> bounds) + { + assert bounds != null; + + for (Bounds<T> bound : bounds) + { + if (bound.contains(token)) + { + return true; + } + } + return false; + } + + public boolean isStartInclusive() + { + return true; + } + + public boolean isEndInclusive() + { + return true; + } + /** * Compute a bounds of keys corresponding to a given bounds of token. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java index 6169494,fe3b13d..4a38d5b --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@@ -106,7 -102,8 +106,7 @@@ public class StreamReade writer = createWriter(cfs, totalSize, repairedAt, format); while (in.getBytesRead() < totalSize) { - writePartition(deserializer, writer, cfs); - writeRow(writer, in, cfs); - ++ writePartition(deserializer, writer); // TODO move this to BytesReadTracker session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); } @@@ -167,122 -167,9 +167,120 @@@ return size; } - protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer, ColumnFamilyStore cfs) throws IOException - protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException ++ protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer) throws IOException { - DecoratedKey key = deserializer.newPartition(); - writer.append(deserializer); - DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); - writer.appendFromStream(key, cfs.metadata, in, inputVersion); ++ writer.append(deserializer.newPartition()); + deserializer.checkForExceptions(); - cfs.invalidateCachedPartition(key); + } + + public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator + { + private final CFMetaData metadata; + private final DataInputPlus in; + private final SerializationHeader header; + private final SerializationHelper helper; + + private DecoratedKey key; + private DeletionTime partitionLevelDeletion; + private SSTableSimpleIterator iterator; + private Row staticRow; + private IOException exception; + + public StreamDeserializer(CFMetaData metadata, DataInputPlus in, Version version, SerializationHeader header) + { + assert version.storeRows() : "We don't allow streaming from pre-3.0 nodes"; + this.metadata = metadata; + this.in = in; + this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE); + this.header = header; + } + - public DecoratedKey newPartition() throws IOException ++ public StreamDeserializer newPartition() throws IOException + { + key = metadata.decorateKey(ByteBufferUtil.readWithShortLength(in)); + partitionLevelDeletion = DeletionTime.serializer.deserialize(in); + iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion); + staticRow = iterator.readStaticRow(); - return key; ++ return this; + } + + public CFMetaData metadata() + { + return metadata; + } + + public PartitionColumns columns() + { + // We don't know which columns we'll get so assume it can be all of them + return metadata.partitionColumns(); + } + + public boolean isReverseOrder() + { + return false; + } + + public DecoratedKey partitionKey() + { + return key; + } + + public DeletionTime partitionLevelDeletion() + { + return partitionLevelDeletion; + } + + public Row staticRow() + { + return staticRow; + } + + public EncodingStats stats() + { + return header.stats(); + } + + public boolean hasNext() + { + try + { + return iterator.hasNext(); + } + catch (IOError e) + { + if (e.getCause() != null && e.getCause() instanceof IOException) + { + exception = (IOException)e.getCause(); + return false; + } + throw e; + } + } + + public Unfiltered next() + { + // Note that in practice we know that IOException will be thrown by hasNext(), because that's + // where the actual reading happens, so we don't bother catching RuntimeException here (contrarily + // to what we do in hasNext) + Unfiltered unfiltered = iterator.next(); + return metadata.isCounter() && unfiltered.kind() == Unfiltered.Kind.ROW + ? maybeMarkLocalToBeCleared((Row) unfiltered) + : unfiltered; + } + + private Row maybeMarkLocalToBeCleared(Row row) + { + return metadata.isCounter() ? row.markCounterLocalToBeCleared() : row; + } + + public void checkForExceptions() throws IOException + { + if (exception != null) + throw exception; + } + + public void close() + { + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 0b864fa,846524b..54ce711 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@@ -29,17 -35,12 +36,19 @@@ import org.apache.cassandra.concurrent. import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.view.View; + import org.apache.cassandra.dht.Bounds; + import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Pair; - import org.apache.cassandra.utils.concurrent.Refs; /** @@@ -128,66 -122,48 +137,92 @@@ public class StreamReceiveTask extends return; } ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + boolean hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right)); - File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L); - if (lockfiledir == null) - throw new IOError(new IOException("All disks full")); - StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID()); - lockfile.create(task.sstables); - List<SSTableReader> readers = new ArrayList<>(); - for (SSTableWriter writer : task.sstables) - readers.add(writer.finish(true)); - lockfile.delete(); - task.sstables.clear(); - - try (Refs<SSTableReader> refs = Refs.ref(readers)) + try { - // add sstables and build secondary indexes - cfs.addSSTables(readers); - cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames()); - - //invalidate row and counter cache - if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter()) + List<SSTableReader> readers = new ArrayList<>(); + for (SSTableMultiWriter writer : task.sstables) { - List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size()); - for (SSTableReader sstable : readers) - boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())); - Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate); + Collection<SSTableReader> newReaders = writer.finish(true); + readers.addAll(newReaders); + task.txn.update(newReaders, false); + } + + task.sstables.clear(); - if (cfs.isRowCacheEnabled()) + try (Refs<SSTableReader> refs = Refs.ref(readers)) + { + //We have a special path for views. + //Since the view requires cleaning up any pre-existing state, we must put + //all partitions through the same write path as normal mutations. + //This also ensures any 2is are also updated + if (hasViews) + { + for (SSTableReader reader : readers) + { + try (ISSTableScanner scanner = reader.getScanner()) + { + while (scanner.hasNext()) + { + try (UnfilteredRowIterator rowIterator = scanner.next()) + { + //Apply unsafe (we will flush below before transaction is done) + new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe(); + } + } + } + } + } + else { - int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds); - if (invalidatedKeys > 0) - logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " + - "receive task completed.", task.session.planId(), invalidatedKeys, - cfs.keyspace.getName(), cfs.getColumnFamilyName()); + task.txn.finish(); + + // add sstables and build secondary indexes + cfs.addSSTables(readers); + cfs.indexManager.buildAllIndexesBlocking(readers); ++ ++ //invalidate row and counter cache ++ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter()) ++ { ++ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size()); ++ readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()))); ++ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate); ++ ++ if (cfs.isRowCacheEnabled()) ++ { ++ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds); ++ if (invalidatedKeys > 0) ++ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " + ++ "receive task completed.", task.session.planId(), invalidatedKeys, ++ cfs.keyspace.getName(), cfs.getTableName()); ++ } ++ ++ if (cfs.metadata.isCounter()) ++ { ++ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds); ++ if (invalidatedKeys > 0) ++ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " + ++ "receive task completed.", task.session.planId(), invalidatedKeys, ++ cfs.keyspace.getName(), cfs.getTableName()); ++ } ++ } } + } + catch (Throwable t) + { + logger.error("Error applying streamed sstable: ", t); - if (cfs.metadata.isCounter()) + JVMStabilityInspector.inspectThrowable(t); + } + finally + { + //We don't keep the streamed sstables since we've applied them manually + //So we abort the txn and delete the streamed sstables + if (hasViews) { - int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds); - if (invalidatedKeys > 0) - logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " + - "receive task completed.", task.session.planId(), invalidatedKeys, - cfs.keyspace.getName(), cfs.getColumnFamilyName()); + cfs.forceBlockingFlush(); + task.txn.abort(); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index fca6aa7,facb906..8f53832 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@@ -93,7 -92,8 +93,7 @@@ public class CompressedStreamReader ext while (in.getBytesRead() < sectionLength) { - writePartition(deserializer, writer, cfs); - writeRow(writer, in, cfs); - ++ writePartition(deserializer, writer); // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/test/unit/org/apache/cassandra/db/CounterCacheTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/CounterCacheTest.java index 65ec420,ed7921e..91157ad --- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java +++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java @@@ -17,12 -17,9 +17,15 @@@ */ package org.apache.cassandra.db; + import java.util.Collections; import java.util.concurrent.ExecutionException; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; ++import org.apache.cassandra.dht.Bounds; ++import org.apache.cassandra.dht.Token; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.ByteBufferUtil; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@@ -95,9 -89,51 +98,54 @@@ public class CounterCacheTes } @Test + public void testCounterCacheInvalidate() + { - ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF); ++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1); + cfs.truncateBlocking(); + CacheService.instance.invalidateCounterCache(); + ++ Clustering c1 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(1)).build(); ++ Clustering c2 = CBuilder.create(cfs.metadata.comparator).add(ByteBufferUtil.bytes(2)).build(); ++ ColumnDefinition cd = cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("c")); ++ + assertEquals(0, CacheService.instance.counterCache.size()); - assertNull(cfs.getCachedCounter(bytes(1), cellname(1))); - assertNull(cfs.getCachedCounter(bytes(1), cellname(2))); - assertNull(cfs.getCachedCounter(bytes(2), cellname(1))); - assertNull(cfs.getCachedCounter(bytes(2), cellname(2))); - assertNull(cfs.getCachedCounter(bytes(3), cellname(1))); - assertNull(cfs.getCachedCounter(bytes(3), cellname(2))); - - cfs.putCachedCounter(bytes(1), cellname(1), ClockAndCount.create(1L, 1L)); - cfs.putCachedCounter(bytes(1), cellname(2), ClockAndCount.create(1L, 2L)); - cfs.putCachedCounter(bytes(2), cellname(1), ClockAndCount.create(2L, 1L)); - cfs.putCachedCounter(bytes(2), cellname(2), ClockAndCount.create(2L, 2L)); - cfs.putCachedCounter(bytes(3), cellname(1), ClockAndCount.create(3L, 1L)); - cfs.putCachedCounter(bytes(3), cellname(2), ClockAndCount.create(3L, 2L)); - - assertEquals(6, CacheService.instance.counterCache.size()); - assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1))); - assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2))); - assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), cellname(1))); - assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), cellname(2))); - assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), cellname(1))); - assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), cellname(2))); - - cfs.invalidateCounterCache(Collections.singleton(new Bounds<Token>(cfs.partitioner.decorateKey(bytes(1)).getToken(), - cfs.partitioner.decorateKey(bytes(2)).getToken()))); ++ assertNull(cfs.getCachedCounter(bytes(1), c1, cd, null)); ++ assertNull(cfs.getCachedCounter(bytes(1), c2, cd, null)); ++ assertNull(cfs.getCachedCounter(bytes(2), c1, cd, null)); ++ assertNull(cfs.getCachedCounter(bytes(2), c2, cd, null)); ++ assertNull(cfs.getCachedCounter(bytes(3), c1, cd, null)); ++ assertNull(cfs.getCachedCounter(bytes(3), c2, cd, null)); ++ ++ cfs.putCachedCounter(bytes(1), c1, cd, null, ClockAndCount.create(1L, 1L)); ++ cfs.putCachedCounter(bytes(1), c2, cd, null, ClockAndCount.create(1L, 2L)); ++ cfs.putCachedCounter(bytes(2), c1, cd, null, ClockAndCount.create(2L, 1L)); ++ cfs.putCachedCounter(bytes(2), c2, cd, null, ClockAndCount.create(2L, 2L)); ++ cfs.putCachedCounter(bytes(3), c1, cd, null, ClockAndCount.create(3L, 1L)); ++ cfs.putCachedCounter(bytes(3), c2, cd, null, ClockAndCount.create(3L, 2L)); ++ ++ assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), c1, cd, null)); ++ assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), c2, cd, null)); ++ assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), c1, cd, null)); ++ assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), c2, cd, null)); ++ assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), c1, cd, null)); ++ assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), c2, cd, null)); ++ ++ cfs.invalidateCounterCache(Collections.singleton(new Bounds<Token>(cfs.decorateKey(bytes(1)).getToken(), ++ cfs.decorateKey(bytes(2)).getToken()))); + + assertEquals(2, CacheService.instance.counterCache.size()); - assertNull(cfs.getCachedCounter(bytes(1), cellname(1))); - assertNull(cfs.getCachedCounter(bytes(1), cellname(2))); - assertNull(cfs.getCachedCounter(bytes(2), cellname(1))); - assertNull(cfs.getCachedCounter(bytes(2), cellname(2))); - assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), cellname(1))); - assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), cellname(2))); ++ assertNull(cfs.getCachedCounter(bytes(1), c1, cd, null)); ++ assertNull(cfs.getCachedCounter(bytes(1), c2, cd, null)); ++ assertNull(cfs.getCachedCounter(bytes(2), c1, cd, null)); ++ assertNull(cfs.getCachedCounter(bytes(2), c2, cd, null)); ++ assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), c1, cd, null)); ++ assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), c2, cd, null)); + } + + @Test public void testSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException { - ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(COUNTER1); cfs.truncateBlocking(); CacheService.instance.invalidateCounterCache(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de23f20/test/unit/org/apache/cassandra/db/RowCacheTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/RowCacheTest.java index d407f7a,9fb322b..b157adc --- a/test/unit/org/apache/cassandra/db/RowCacheTest.java +++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java @@@ -20,22 -20,28 +20,27 @@@ package org.apache.cassandra.db import java.net.InetAddress; import java.nio.ByteBuffer; + import java.util.ArrayList; -import java.util.Collection; +import java.util.Arrays; import java.util.Iterator; + import java.util.TreeSet; + import com.google.common.collect.Lists; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; - import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; -import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.cache.RowCacheKey; -import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.composites.*; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.marshal.IntegerType; +import org.apache.cassandra.db.partitions.CachedPartition; + import org.apache.cassandra.dht.Bounds; + import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken; import org.apache.cassandra.locator.TokenMetadata; @@@ -230,6 -178,51 +235,51 @@@ public class RowCacheTes } @Test + public void testInvalidateRowCache() throws Exception + { + StorageService.instance.initServer(0); + CacheService.instance.setRowCacheCapacityInMB(1); + rowCacheLoad(100, Integer.MAX_VALUE, 1000); + + ColumnFamilyStore store = Keyspace.open(KEYSPACE_CACHED).getColumnFamilyStore(CF_CACHED); + assertEquals(CacheService.instance.rowCache.size(), 100); + - //construct 5 ranges of 20 elements each ++ //construct 5 bounds of 20 elements each + ArrayList<Bounds<Token>> subranges = getBounds(20); + - //invalidate 3 of the 5 ranges ++ //invalidate 3 of the 5 bounds + ArrayList<Bounds<Token>> boundsToInvalidate = Lists.newArrayList(subranges.get(0), subranges.get(2), subranges.get(4)); + int invalidatedKeys = store.invalidateRowCache(boundsToInvalidate); + assertEquals(60, invalidatedKeys); + + //now there should be only 40 cached entries left + assertEquals(CacheService.instance.rowCache.size(), 40); + CacheService.instance.setRowCacheCapacityInMB(0); + } + + private ArrayList<Bounds<Token>> getBounds(int nElements) + { + ColumnFamilyStore store = Keyspace.open(KEYSPACE_CACHED).getColumnFamilyStore(CF_CACHED); + TreeSet<DecoratedKey> orderedKeys = new TreeSet<>(); + + for(Iterator<RowCacheKey> it = CacheService.instance.rowCache.keyIterator();it.hasNext();) - orderedKeys.add(store.partitioner.decorateKey(ByteBuffer.wrap(it.next().key))); ++ orderedKeys.add(store.decorateKey(ByteBuffer.wrap(it.next().key))); + + ArrayList<Bounds<Token>> boundsToInvalidate = new ArrayList<>(); + Iterator<DecoratedKey> iterator = orderedKeys.iterator(); + + while (iterator.hasNext()) + { + Token startRange = iterator.next().getToken(); + for (int i = 0; i < nElements-2; i++) + iterator.next(); + Token endRange = iterator.next().getToken(); + boundsToInvalidate.add(new Bounds<>(startRange, endRange)); + } + return boundsToInvalidate; + } + + @Test public void testRowCachePartialLoad() throws Exception { CacheService.instance.setRowCacheCapacityInMB(1);