http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java index f427389,0000000..5c51fbb mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@@ -1,261 -1,0 +1,261 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable.format.big; + +import com.google.common.util.concurrent.RateLimiter; +import org.apache.cassandra.cache.KeyCacheKey; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +/** + * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen. + * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead. + */ +public class BigTableReader extends SSTableReader +{ + private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class); + + BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason) + { + super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason); + } + + public OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns) + { + return new SSTableNamesIterator(this, key, columns); + } + + public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry ) + { + return new SSTableNamesIterator(this, input, key, columns, indexEntry); + } + + public OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse) + { + return new SSTableSliceIterator(this, key, slices, reverse); + } + + public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, ColumnSlice[] slices, boolean reverse, RowIndexEntry indexEntry) + { + return new SSTableSliceIterator(this, input, key, slices, reverse, indexEntry); + } + /** + * + * @param dataRange filter to use when reading the columns + * @return A Scanner for seeking over the rows of the SSTable. + */ + public ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter) + { + return BigTableScanner.getScanner(this, dataRange, limiter); + } + + + /** + * Direct I/O SSTableScanner over a defined collection of ranges of tokens. + * + * @param ranges the range of keys to cover + * @return A Scanner for seeking over the rows of the SSTable. + */ + public ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter) + { + return BigTableScanner.getScanner(this, ranges, limiter); + } + + + /** + * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to + * allow key selection by token bounds but only if op != * EQ + * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins. + * @param updateCacheAndStats true if updating stats and cache + * @return The index entry corresponding to the key, or null if the key is not present + */ + protected RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast) + { + if (op == Operator.EQ) + { + assert key instanceof DecoratedKey; // EQ only make sense if the key is a valid row key + if (!bf.isPresent((DecoratedKey)key)) + { + Tracing.trace("Bloom filter allows skipping sstable {}", descriptor.generation); + return null; + } + } + + // next, the key cache (only make sense for valid row key) + if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey)) + { + DecoratedKey decoratedKey = (DecoratedKey)key; - KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, decoratedKey.getKey()); ++ KeyCacheKey cacheKey = new KeyCacheKey(metadata.ksAndCFName, descriptor, decoratedKey.getKey()); + RowIndexEntry cachedPosition = getCachedPosition(cacheKey, updateCacheAndStats); + if (cachedPosition != null) + { + Tracing.trace("Key cache hit for sstable {}", descriptor.generation); + return cachedPosition; + } + } + + // check the smallest and greatest keys in the sstable to see if it can't be present + boolean skip = false; + if (key.compareTo(first) < 0) + { + if (op == Operator.EQ) + skip = true; + else + key = first; + + op = Operator.EQ; + } + else + { + int l = last.compareTo(key); + // l <= 0 => we may be looking past the end of the file; we then narrow our behaviour to: + // 1) skipping if strictly greater for GE and EQ; + // 2) skipping if equal and searching GT, and we aren't permitting matching past last + skip = l <= 0 && (l < 0 || (!permitMatchPastLast && op == Operator.GT)); + } + if (skip) + { + if (op == Operator.EQ && updateCacheAndStats) + bloomFilterTracker.addFalsePositive(); + Tracing.trace("Check against min and max keys allows skipping sstable {}", descriptor.generation); + return null; + } + + int binarySearchResult = indexSummary.binarySearch(key); + long sampledPosition = getIndexScanPositionFromBinarySearchResult(binarySearchResult, indexSummary); + int sampledIndex = getIndexSummaryIndexFromBinarySearchResult(binarySearchResult); + + int effectiveInterval = indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex); + + if (ifile == null) + return null; + + // scan the on-disk index, starting at the nearest sampled position. + // The check against IndexInterval is to be exit the loop in the EQ case when the key looked for is not present + // (bloom filter false positive). But note that for non-EQ cases, we might need to check the first key of the + // next index position because the searched key can be greater the last key of the index interval checked if it + // is lesser than the first key of next interval (and in that case we must return the position of the first key + // of the next interval). + int i = 0; + Iterator<FileDataInput> segments = ifile.iterator(sampledPosition); + while (segments.hasNext()) + { + String path = null; + try (FileDataInput in = segments.next()) + { + path = in.getPath(); + while (!in.isEOF()) + { + i++; + + ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); + + boolean opSatisfied; // did we find an appropriate position for the op requested + boolean exactMatch; // is the current position an exact match for the key, suitable for caching + + // Compare raw keys if possible for performance, otherwise compare decorated keys. + if (op == Operator.EQ && i <= effectiveInterval) + { + opSatisfied = exactMatch = indexKey.equals(((DecoratedKey) key).getKey()); + } + else + { + DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey); + int comparison = indexDecoratedKey.compareTo(key); + int v = op.apply(comparison); + opSatisfied = (v == 0); + exactMatch = (comparison == 0); + if (v < 0) + { + Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation); + return null; + } + } + + if (opSatisfied) + { + // read data position from index entry + RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in, descriptor.version); + if (exactMatch && updateCacheAndStats) + { + assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key + DecoratedKey decoratedKey = (DecoratedKey)key; + + if (logger.isTraceEnabled()) + { + // expensive sanity check! see CASSANDRA-4687 + try (FileDataInput fdi = dfile.getSegment(indexEntry.position)) + { + DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi)); + if (!keyInDisk.equals(key)) + throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath())); + } + } + + // store exact match for the key + cacheKey(decoratedKey, indexEntry); + } + if (op == Operator.EQ && updateCacheAndStats) + bloomFilterTracker.addTruePositive(); + Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation); + return indexEntry; + } + + RowIndexEntry.Serializer.skip(in); + } + } + catch (IOException e) + { + markSuspect(); + throw new CorruptSSTableException(e, path); + } + } + + if (op == SSTableReader.Operator.EQ && updateCacheAndStats) + bloomFilterTracker.addFalsePositive(); + Tracing.trace("Partition index lookup complete (bloom filter false positive) for sstable {}", descriptor.generation); + return null; + } + + +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/CacheService.java index a775627,50d8903..a13a52d --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@@ -267,9 -282,9 +266,9 @@@ public class CacheService implements Ca keyCache.clear(); } - public void invalidateKeyCacheForCf(UUID cfId) + public void invalidateKeyCacheForCf(Pair<String, String> ksAndCFName) { - Iterator<KeyCacheKey> keyCacheIterator = keyCache.getKeySet().iterator(); + Iterator<KeyCacheKey> keyCacheIterator = keyCache.keyIterator(); while (keyCacheIterator.hasNext()) { KeyCacheKey key = keyCacheIterator.next(); @@@ -283,9 -298,9 +282,9 @@@ rowCache.clear(); } - public void invalidateRowCacheForCf(UUID cfId) + public void invalidateRowCacheForCf(Pair<String, String> ksAndCFName) { - Iterator<RowCacheKey> rowCacheIterator = rowCache.getKeySet().iterator(); + Iterator<RowCacheKey> rowCacheIterator = rowCache.keyIterator(); while (rowCacheIterator.hasNext()) { RowCacheKey rowCacheKey = rowCacheIterator.next(); @@@ -294,9 -309,9 +293,9 @@@ } } - public void invalidateCounterCacheForCf(UUID cfId) + public void invalidateCounterCacheForCf(Pair<String, String> ksAndCFName) { - Iterator<CounterCacheKey> counterCacheIterator = counterCache.getKeySet().iterator(); + Iterator<CounterCacheKey> counterCacheIterator = counterCache.keyIterator(); while (counterCacheIterator.hasNext()) { CounterCacheKey counterCacheKey = counterCacheIterator.next(); @@@ -423,7 -487,7 +435,7 @@@ ByteBufferUtil.writeWithLength(key.key, out); out.writeInt(key.desc.generation); out.writeBoolean(true); - key.desc.getFormat().getIndexSerializer(cfm).serialize(entry, out); - cfs.metadata.comparator.rowIndexEntrySerializer().serialize(entry, out); ++ key.desc.getFormat().getIndexSerializer(cfs.metadata).serialize(entry, out); } public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException @@@ -436,15 -502,15 +450,15 @@@ } ByteBuffer key = ByteBufferUtil.read(input, keyLength); int generation = input.readInt(); - SSTableReader reader = findDesc(generation, cfs.getSSTables()); input.readBoolean(); // backwards compatibility for "promoted indexes" boolean - if (reader == null) + SSTableReader reader = null; + if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = findDesc(generation, cfs.getSSTables())) == null) { - RowIndexEntry.Serializer.skipPromotedIndex(input); + RowIndexEntry.Serializer.skip(input); return null; } - RowIndexEntry entry = reader.metadata.comparator.rowIndexEntrySerializer().deserialize(input, reader.descriptor.version); + RowIndexEntry entry = reader.descriptor.getFormat().getIndexSerializer(reader.metadata).deserialize(input, reader.descriptor.version); - return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.cfId, reader.descriptor, key), entry)); + return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry)); } private SSTableReader findDesc(int generation, Collection<SSTableReader> collection) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java index 2020201,17553f3..075c8f7 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@@ -25,25 -25,22 +25,27 @@@ import java.net.InetAddress import java.net.UnknownHostException; import java.rmi.registry.LocateRegistry; import java.rmi.server.RMIServerSocketFactory; +import java.util.Collections; + import java.util.List; -import java.util.*; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; + import javax.management.MBeanServer; - import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import javax.management.StandardMBean; import javax.management.remote.JMXConnectorServer; import javax.management.remote.JMXServiceURL; import javax.management.remote.rmi.RMIConnectorServer; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistryListener; +import com.codahale.metrics.SharedMetricRegistries; import com.google.common.annotations.VisibleForTesting; --import com.google.common.collect.Iterables; + import com.google.common.util.concurrent.Futures; + import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.cassandra.metrics.DefaultNameFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -353,29 -465,6 +376,29 @@@ public class CassandraDaemo return setupCompleted; } + private void logSystemInfo() + { + if (logger.isInfoEnabled()) + { + try + { + logger.info("Hostname: {}", InetAddress.getLocalHost().getHostName()); + } + catch (UnknownHostException e1) + { + logger.info("Could not resolve local host"); + } - ++ + logger.info("JVM vendor/version: {}/{}", System.getProperty("java.vm.name"), System.getProperty("java.version")); + logger.info("Heap size: {}/{}", Runtime.getRuntime().totalMemory(), Runtime.getRuntime().maxMemory()); - ++ + for(MemoryPoolMXBean pool: ManagementFactory.getMemoryPoolMXBeans()) + logger.info("{} {}: {}", pool.getName(), pool.getType(), pool.getPeakUsage()); - ++ + logger.info("Classpath: {}", System.getProperty("java.class.path")); + } + } + /** * Initialize the Cassandra Daemon based on the given <a * href="http://commons.apache.org/daemon/jsvc.html">Commons http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index 38736dc,431f163..fa370dc --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -683,15 -717,18 +683,22 @@@ public class StorageService extends Not }, "StorageServiceShutdownHook"); Runtime.getRuntime().addShutdownHook(drainOnShutdown); + replacing = DatabaseDescriptor.isReplacing(); + prepareToJoin(); + // Has to be called after the host id has potentially changed in prepareToJoin(). - for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) - if (cfs.metadata.isCounter()) - cfs.initCounterCache(); + try + { + CacheService.instance.counterCache.loadSavedAsync().get(); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + logger.warn("Error loading counter cache", t); + } + + if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))) { joinTokenRing(delay); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/cache/CacheProviderTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cache/CacheProviderTest.java index fe26616,63f89a4..bfcfa59 --- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java +++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java @@@ -24,17 -24,14 +24,17 @@@ package org.apache.cassandra.cache import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; --import java.util.UUID; +import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.db.ArrayBackedSortedColumns; import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.locator.SimpleStrategy; + import org.apache.cassandra.utils.Pair; import com.googlecode.concurrentlinkedhashmap.Weighers; @@@ -132,17 -119,16 +132,16 @@@ public class CacheProviderTes @Test public void testKeys() { - UUID cfId = UUID.randomUUID(); - - Pair<String, String> ksAndCFName = Pair.create(keyspaceName, cfName); ++ Pair<String, String> ksAndCFName = Pair.create(KEYSPACE1, CF_STANDARD1); byte[] b1 = {1, 2, 3, 4}; - RowCacheKey key1 = new RowCacheKey(cfId, ByteBuffer.wrap(b1)); + RowCacheKey key1 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b1)); byte[] b2 = {1, 2, 3, 4}; - RowCacheKey key2 = new RowCacheKey(cfId, ByteBuffer.wrap(b2)); + RowCacheKey key2 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b2)); assertEquals(key1, key2); assertEquals(key1.hashCode(), key2.hashCode()); - + byte[] b3 = {1, 2, 3, 5}; - RowCacheKey key3 = new RowCacheKey(cfId, ByteBuffer.wrap(b3)); + RowCacheKey key3 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b3)); assertNotSame(key1, key3); assertNotSame(key1.hashCode(), key3.hashCode()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java index 0000000,0e879e9..1a60d6d mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java +++ b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java @@@ -1,0 -1,263 +1,266 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.cql3; + + import java.util.ArrayList; ++import java.util.Iterator; + import java.util.List; + + import org.junit.Assert; + import org.junit.Test; + + import com.google.common.collect.ImmutableSet; -import com.yammer.metrics.Metrics; -import com.yammer.metrics.core.MetricName; + + import org.apache.cassandra.cache.KeyCacheKey; + import org.apache.cassandra.config.Schema; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.metrics.CacheMetrics; ++import org.apache.cassandra.metrics.CassandraMetricsRegistry; + import org.apache.cassandra.service.CacheService; + import org.apache.cassandra.service.StorageService; + import org.apache.cassandra.utils.Pair; + + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertNull; + + public class KeyCacheCqlTest extends CQLTester + { + + static final String commonColumnsDef = + "part_key_a int," + + "part_key_b text," + + "clust_key_a int," + + "clust_key_b text," + + "clust_key_c frozen<list<text>>," + // to make it really big + "col_text text," + + "col_int int," + + "col_long bigint,"; + static final String commonColumns = + "part_key_a," + + "part_key_b," + + "clust_key_a," + + "clust_key_b," + + "clust_key_c," + // to make it really big + "col_text," + + "col_int," + + "col_long"; + + @Test + public void test2iKeyCachePaths() throws Throwable + { + String table = createTable("CREATE TABLE %s (" + + commonColumnsDef + + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))"); + createIndex("CREATE INDEX some_index ON %s (col_int)"); + insertData(table, "some_index", true); + clearCache(); + + CacheMetrics metrics = CacheService.instance.keyCache.getMetrics(); + + for (int i = 0; i < 10; i++) + { + UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i); + assertEquals(500, result.size()); + } + - long hits = metrics.hits.count(); - long requests = metrics.requests.count(); ++ long hits = metrics.hits.getCount(); ++ long requests = metrics.requests.getCount(); + assertEquals(4900, hits); + assertEquals(5250, requests); + + // + + for (int i = 0; i < 10; i++) + { + UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i); + // 100 part-keys * 50 clust-keys + // indexed on part-key % 10 = 10 index partitions + // (50 clust-keys * 100-part-keys / 10 possible index-values) = 500 + assertEquals(500, result.size()); + } + + metrics = CacheService.instance.keyCache.getMetrics(); - hits = metrics.hits.count(); - requests = metrics.requests.count(); ++ hits = metrics.hits.getCount(); ++ requests = metrics.requests.getCount(); + assertEquals(10000, hits); + assertEquals(10500, requests); + + CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get(); + + int beforeSize = CacheService.instance.keyCache.size(); + + CacheService.instance.keyCache.clear(); + + Assert.assertEquals(0, CacheService.instance.keyCache.size()); + + // then load saved + CacheService.instance.keyCache.loadSaved(); + + assertEquals(beforeSize, CacheService.instance.keyCache.size()); + + for (int i = 0; i < 10; i++) + { + UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i); + // 100 part-keys * 50 clust-keys + // indexed on part-key % 10 = 10 index partitions + // (50 clust-keys * 100-part-keys / 10 possible index-values) = 500 + assertEquals(500, result.size()); + } + + //Test Schema.getColumnFamilyStoreIncludingIndexes, several null check paths + //are defensive and unreachable + assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create("foo", "bar"))); + assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(KEYSPACE, "bar"))); + + dropTable("DROP TABLE %s"); + + //Test loading for a dropped 2i/table + CacheService.instance.keyCache.clear(); + + // then load saved + CacheService.instance.keyCache.loadSaved(); + + assertEquals(0, CacheService.instance.keyCache.size()); + } + + @Test + public void test2iKeyCachePathsSaveKeysForDroppedTable() throws Throwable + { + String table = createTable("CREATE TABLE %s (" + + commonColumnsDef + + "PRIMARY KEY ((part_key_a, part_key_b),clust_key_a,clust_key_b,clust_key_c))"); + createIndex("CREATE INDEX some_index ON %s (col_int)"); + insertData(table, "some_index", true); + clearCache(); + + CacheMetrics metrics = CacheService.instance.keyCache.getMetrics(); + + for (int i = 0; i < 10; i++) + { + UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i); + assertEquals(500, result.size()); + } + - long hits = metrics.hits.count(); - long requests = metrics.requests.count(); ++ long hits = metrics.hits.getCount(); ++ long requests = metrics.requests.getCount(); + assertEquals(4900, hits); + assertEquals(5250, requests); + + // + + for (int i = 0; i < 10; i++) + { + UntypedResultSet result = execute("SELECT part_key_a FROM %s WHERE col_int = ?", i); + // 100 part-keys * 50 clust-keys + // indexed on part-key % 10 = 10 index partitions + // (50 clust-keys * 100-part-keys / 10 possible index-values) = 500 + assertEquals(500, result.size()); + } + + metrics = CacheService.instance.keyCache.getMetrics(); - hits = metrics.hits.count(); - requests = metrics.requests.count(); ++ hits = metrics.hits.getCount(); ++ requests = metrics.requests.getCount(); + assertEquals(10000, hits); + assertEquals(10500, requests); + + dropTable("DROP TABLE %s"); + + CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get(); + + CacheService.instance.keyCache.clear(); + + Assert.assertEquals(0, CacheService.instance.keyCache.size()); + + // then load saved + CacheService.instance.keyCache.loadSaved(); + - for (KeyCacheKey key : CacheService.instance.keyCache.getKeySet()) ++ Iterator<KeyCacheKey> iter = CacheService.instance.keyCache.keyIterator(); ++ while(iter.hasNext()) + { ++ KeyCacheKey key = iter.next(); + Assert.assertFalse(key.ksAndCFName.left.equals("KEYSPACE")); + Assert.assertFalse(key.ksAndCFName.right.startsWith(table)); + } + } + + // Inserts 100 partitions split over 10 sstables (flush after 10 partitions). + // Clustered tables receive 50 CQL rows per partition. + private void insertData(String table, String index, boolean withClustering) throws Throwable + { + StorageService.instance.disableAutoCompaction(KEYSPACE, table); + Keyspace.open(KEYSPACE).getColumnFamilyStore(table).forceFlush().get(); + Keyspace.open(KEYSPACE).getColumnFamilyStore(table).truncateBlocking(); + if (index != null) + { + StorageService.instance.disableAutoCompaction(KEYSPACE, table + '.' + index); + Keyspace.open(KEYSPACE).getColumnFamilyStore(table).indexManager.getIndexesByNames(ImmutableSet.of(table + "." + index)).iterator().next().forceBlockingFlush(); + } + + for (int i = 0; i < 100; i++) + { + int partKeyA = i; + String partKeyB = Integer.toOctalString(i); + for (int c = 0; c < (withClustering ? 50 : 1); c++) + { + int clustKeyA = c; + String clustKeyB = Integer.toOctalString(c); + List<String> clustKeyC = makeList(clustKeyB); + String colText = String.valueOf(i) + '-' + String.valueOf(c); + int colInt = i % 10; + long colLong = c; + execute("INSERT INTO %s (" + commonColumns + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + partKeyA, partKeyB, + clustKeyA, clustKeyB, clustKeyC, + colText, colInt, colLong); + } + + if (i % 10 == 9) + { + Keyspace.open(KEYSPACE).getColumnFamilyStore(table).forceFlush().get(); + if (index != null) + Keyspace.open(KEYSPACE).getColumnFamilyStore(table).indexManager.getIndexesByNames(ImmutableSet.of(table + "." + index)).iterator().next().forceBlockingFlush(); + } + } + } + + private static List<String> makeList(String value) + { + List<String> list = new ArrayList<>(50); + for (int i = 0; i < 50; i++) + { + list.add(value + i); + } + return list; + } + + private static void clearCache() + { - for (MetricName name : ImmutableSet.copyOf(Metrics.defaultRegistry().allMetrics().keySet())) ++ for (String name : ImmutableSet.copyOf(CassandraMetricsRegistry.Metrics.getMetrics().keySet())) + { - Metrics.defaultRegistry().removeMetric(name); ++ CassandraMetricsRegistry.Metrics.remove(name); + } ++ + CacheService.instance.keyCache.clear(); + CacheMetrics metrics = CacheService.instance.keyCache.getMetrics(); - Assert.assertEquals(0, metrics.entries.value().intValue()); - Assert.assertEquals(0L, metrics.hits.count()); - Assert.assertEquals(0L, metrics.requests.count()); - Assert.assertEquals(0L, metrics.size.value().longValue()); ++ Assert.assertEquals(0, metrics.entries.getValue().intValue()); ++ Assert.assertEquals(0L, metrics.hits.getCount()); ++ Assert.assertEquals(0L, metrics.requests.getCount()); ++ Assert.assertEquals(0L, metrics.size.getValue().longValue()); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/db/CounterCacheTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/CounterCacheTest.java index 71f8b20,20e067c..5b37b2c --- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java +++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java @@@ -24,11 -23,8 +24,12 @@@ import org.junit.BeforeClass import org.junit.Test; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.db.marshal.CounterColumnType; +import org.apache.cassandra.exceptions.ConfigurationException; + import org.apache.cassandra.config.Schema; import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.utils.FBUtilities; @@@ -62,7 -48,8 +63,8 @@@ public class CounterCacheTes @Test public void testReadWrite() { - ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF); + cfs.truncateBlocking(); CacheService.instance.invalidateCounterCache(); assertEquals(0, CacheService.instance.counterCache.size()); @@@ -86,7 -73,8 +88,8 @@@ @Test public void testSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException { - ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF); + cfs.truncateBlocking(); CacheService.instance.invalidateCounterCache(); ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); @@@ -108,4 -96,69 +111,69 @@@ assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(2), cellname(1))); assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(2), cellname(2))); } + + @Test + public void testDroppedSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException + { - ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF); ++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF); + cfs.truncateBlocking(); + CacheService.instance.invalidateCounterCache(); + + ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); + cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros())); + cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros())); - new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); - new CounterMutation(new Mutation(KS, bytes(2), cells), ConsistencyLevel.ONE).apply(); ++ new CounterMutation(new Mutation(KEYSPACE1, bytes(1), cells), ConsistencyLevel.ONE).apply(); ++ new CounterMutation(new Mutation(KEYSPACE1, bytes(2), cells), ConsistencyLevel.ONE).apply(); + + // flush the counter cache and invalidate + CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get(); + CacheService.instance.invalidateCounterCache(); + assertEquals(0, CacheService.instance.counterCache.size()); + - Keyspace ks = Schema.instance.removeKeyspaceInstance(KS); ++ Keyspace ks = Schema.instance.removeKeyspaceInstance(KEYSPACE1); + + try + { + // load from cache and validate + CacheService.instance.counterCache.loadSaved(); + assertEquals(0, CacheService.instance.counterCache.size()); + } + finally + { + Schema.instance.storeKeyspaceInstance(ks); + } + } + + @Test + public void testDisabledSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException + { - ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF); ++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF); + cfs.truncateBlocking(); + CacheService.instance.invalidateCounterCache(); + + ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata); + cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros())); + cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros())); - new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply(); - new CounterMutation(new Mutation(KS, bytes(2), cells), ConsistencyLevel.ONE).apply(); ++ new CounterMutation(new Mutation(KEYSPACE1, bytes(1), cells), ConsistencyLevel.ONE).apply(); ++ new CounterMutation(new Mutation(KEYSPACE1, bytes(2), cells), ConsistencyLevel.ONE).apply(); + + // flush the counter cache and invalidate + CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get(); + CacheService.instance.invalidateCounterCache(); + assertEquals(0, CacheService.instance.counterCache.size()); + + + CacheService.instance.setCounterCacheCapacityInMB(0); + try + { + // load from cache and validate + CacheService.instance.counterCache.loadSaved(); + assertEquals(0, CacheService.instance.counterCache.size()); + } + finally + { + CacheService.instance.setCounterCacheCapacityInMB(1); + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/db/KeyCacheTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/db/RowCacheTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/RowCacheTest.java index a4b7514,6d4554d..5912d7c --- a/test/unit/org/apache/cassandra/db/RowCacheTest.java +++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java @@@ -28,9 -27,8 +28,10 @@@ import org.junit.Test import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.cache.RowCacheKey; +import org.apache.cassandra.config.KSMetaData; + import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.composites.*; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.QueryFilter; @@@ -177,6 -158,42 +178,42 @@@ public class RowCacheTes rowCacheLoad(100, 50, 0); CacheService.instance.setRowCacheCapacityInMB(0); } + + @Test + public void testRowCacheDropSaveLoad() throws Exception + { + CacheService.instance.setRowCacheCapacityInMB(1); + rowCacheLoad(100, 50, 0); + CacheService.instance.rowCache.submitWrite(Integer.MAX_VALUE).get(); - Keyspace instance = Schema.instance.removeKeyspaceInstance(KEYSPACE); ++ Keyspace instance = Schema.instance.removeKeyspaceInstance(KEYSPACE_CACHED); + try + { + CacheService.instance.rowCache.size(); + CacheService.instance.rowCache.clear(); + CacheService.instance.rowCache.loadSaved(); + int after = CacheService.instance.rowCache.size(); + assertEquals(0, after); + } + finally + { + Schema.instance.storeKeyspaceInstance(instance); + } + } + + @Test + public void testRowCacheDisabled() throws Exception + { + CacheService.instance.setRowCacheCapacityInMB(1); + rowCacheLoad(100, 50, 0); + CacheService.instance.rowCache.submitWrite(Integer.MAX_VALUE).get(); + CacheService.instance.setRowCacheCapacityInMB(0); + CacheService.instance.rowCache.size(); + CacheService.instance.rowCache.clear(); + CacheService.instance.rowCache.loadSaved(); + int after = CacheService.instance.rowCache.size(); + assertEquals(0, after); + } + @Test public void testRowCacheRange() { @@@ -195,8 -212,8 +232,8 @@@ ByteBuffer key = ByteBufferUtil.bytes("rowcachekey"); DecoratedKey dk = cachedStore.partitioner.decorateKey(key); - RowCacheKey rck = new RowCacheKey(cachedStore.metadata.cfId, dk); + RowCacheKey rck = new RowCacheKey(cachedStore.metadata.ksAndCFName, dk); - Mutation mutation = new Mutation(KEYSPACE, key); + Mutation mutation = new Mutation(KEYSPACE_CACHED, key); for (int i = 0; i < 200; i++) mutation.add(cf, Util.cellname(i), ByteBufferUtil.bytes("val" + i), System.currentTimeMillis()); mutation.applyUnsafe(); @@@ -271,7 -288,7 +308,7 @@@ // empty the cache again to make sure values came from disk CacheService.instance.invalidateRowCache(); - assert CacheService.instance.rowCache.size() == 0; - assert CacheService.instance.rowCache.loadSaved() == (keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave); + assertEquals(0, CacheService.instance.rowCache.size()); - assertEquals(keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave, CacheService.instance.rowCache.loadSaved(store)); ++ assertEquals(keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave, CacheService.instance.rowCache.loadSaved()); } }