http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index f427389,0000000..5c51fbb
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@@ -1,261 -1,0 +1,261 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import com.google.common.util.concurrent.RateLimiter;
 +import org.apache.cassandra.cache.KeyCacheKey;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.RowIndexEntry;
 +import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +/**
 + * SSTableReaders are open()ed by Keyspace.onStart; after that they are 
created by SSTableWriter.renameAndOpen.
 + * Do not re-call open() on existing SSTable files; use the references kept 
by ColumnFamilyStore post-start instead.
 + */
 +public class BigTableReader extends SSTableReader
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(BigTableReader.class);
 +
 +    BigTableReader(Descriptor desc, Set<Component> components, CFMetaData 
metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata 
sstableMetadata, OpenReason openReason)
 +    {
 +        super(desc, components, metadata, partitioner, maxDataAge, 
sstableMetadata, openReason);
 +    }
 +
 +    public OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> 
columns)
 +    {
 +        return new SSTableNamesIterator(this, key, columns);
 +    }
 +
 +    public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, 
SortedSet<CellName> columns, RowIndexEntry indexEntry )
 +    {
 +        return new SSTableNamesIterator(this, input, key, columns, 
indexEntry);
 +    }
 +
 +    public OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] 
slices, boolean reverse)
 +    {
 +        return new SSTableSliceIterator(this, key, slices, reverse);
 +    }
 +
 +    public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, 
ColumnSlice[] slices, boolean reverse, RowIndexEntry indexEntry)
 +    {
 +        return new SSTableSliceIterator(this, input, key, slices, reverse, 
indexEntry);
 +    }
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner(DataRange dataRange, RateLimiter 
limiter)
 +    {
 +        return BigTableScanner.getScanner(this, dataRange, limiter);
 +    }
 +
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined collection of ranges of 
tokens.
 +     *
 +     * @param ranges the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner(Collection<Range<Token>> ranges, 
RateLimiter limiter)
 +    {
 +        return BigTableScanner.getScanner(this, ranges, limiter);
 +    }
 +
 +
 +    /**
 +     * @param key The key to apply as the rhs to the given Operator. A 'fake' 
key is allowed to
 +     * allow key selection by token bounds but only if op != * EQ
 +     * @param op The Operator defining matching keys: the nearest key to the 
target matching the operator wins.
 +     * @param updateCacheAndStats true if updating stats and cache
 +     * @return The index entry corresponding to the key, or null if the key 
is not present
 +     */
 +    protected RowIndexEntry getPosition(RowPosition key, Operator op, boolean 
updateCacheAndStats, boolean permitMatchPastLast)
 +    {
 +        if (op == Operator.EQ)
 +        {
 +            assert key instanceof DecoratedKey; // EQ only make sense if the 
key is a valid row key
 +            if (!bf.isPresent((DecoratedKey)key))
 +            {
 +                Tracing.trace("Bloom filter allows skipping sstable {}", 
descriptor.generation);
 +                return null;
 +            }
 +        }
 +
 +        // next, the key cache (only make sense for valid row key)
 +        if ((op == Operator.EQ || op == Operator.GE) && (key instanceof 
DecoratedKey))
 +        {
 +            DecoratedKey decoratedKey = (DecoratedKey)key;
-             KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, 
decoratedKey.getKey());
++            KeyCacheKey cacheKey = new KeyCacheKey(metadata.ksAndCFName, 
descriptor, decoratedKey.getKey());
 +            RowIndexEntry cachedPosition = getCachedPosition(cacheKey, 
updateCacheAndStats);
 +            if (cachedPosition != null)
 +            {
 +                Tracing.trace("Key cache hit for sstable {}", 
descriptor.generation);
 +                return cachedPosition;
 +            }
 +        }
 +
 +        // check the smallest and greatest keys in the sstable to see if it 
can't be present
 +        boolean skip = false;
 +        if (key.compareTo(first) < 0)
 +        {
 +            if (op == Operator.EQ)
 +                skip = true;
 +            else
 +                key = first;
 +
 +            op = Operator.EQ;
 +        }
 +        else
 +        {
 +            int l = last.compareTo(key);
 +            // l <= 0  => we may be looking past the end of the file; we then 
narrow our behaviour to:
 +            //             1) skipping if strictly greater for GE and EQ;
 +            //             2) skipping if equal and searching GT, and we 
aren't permitting matching past last
 +            skip = l <= 0 && (l < 0 || (!permitMatchPastLast && op == 
Operator.GT));
 +        }
 +        if (skip)
 +        {
 +            if (op == Operator.EQ && updateCacheAndStats)
 +                bloomFilterTracker.addFalsePositive();
 +            Tracing.trace("Check against min and max keys allows skipping 
sstable {}", descriptor.generation);
 +            return null;
 +        }
 +
 +        int binarySearchResult = indexSummary.binarySearch(key);
 +        long sampledPosition = 
getIndexScanPositionFromBinarySearchResult(binarySearchResult, indexSummary);
 +        int sampledIndex = 
getIndexSummaryIndexFromBinarySearchResult(binarySearchResult);
 +
 +        int effectiveInterval = 
indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex);
 +
 +        if (ifile == null)
 +            return null;
 +
 +        // scan the on-disk index, starting at the nearest sampled position.
 +        // The check against IndexInterval is to be exit the loop in the EQ 
case when the key looked for is not present
 +        // (bloom filter false positive). But note that for non-EQ cases, we 
might need to check the first key of the
 +        // next index position because the searched key can be greater the 
last key of the index interval checked if it
 +        // is lesser than the first key of next interval (and in that case we 
must return the position of the first key
 +        // of the next interval).
 +        int i = 0;
 +        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
 +        while (segments.hasNext())
 +        {
 +            String path = null;
 +            try (FileDataInput in = segments.next())
 +            {
 +                path = in.getPath();
 +                while (!in.isEOF())
 +                {
 +                    i++;
 +
 +                    ByteBuffer indexKey = 
ByteBufferUtil.readWithShortLength(in);
 +
 +                    boolean opSatisfied; // did we find an appropriate 
position for the op requested
 +                    boolean exactMatch; // is the current position an exact 
match for the key, suitable for caching
 +
 +                    // Compare raw keys if possible for performance, 
otherwise compare decorated keys.
 +                    if (op == Operator.EQ && i <= effectiveInterval)
 +                    {
 +                        opSatisfied = exactMatch = 
indexKey.equals(((DecoratedKey) key).getKey());
 +                    }
 +                    else
 +                    {
 +                        DecoratedKey indexDecoratedKey = 
partitioner.decorateKey(indexKey);
 +                        int comparison = indexDecoratedKey.compareTo(key);
 +                        int v = op.apply(comparison);
 +                        opSatisfied = (v == 0);
 +                        exactMatch = (comparison == 0);
 +                        if (v < 0)
 +                        {
 +                            Tracing.trace("Partition index lookup allows 
skipping sstable {}", descriptor.generation);
 +                            return null;
 +                        }
 +                    }
 +
 +                    if (opSatisfied)
 +                    {
 +                        // read data position from index entry
 +                        RowIndexEntry indexEntry = 
rowIndexEntrySerializer.deserialize(in, descriptor.version);
 +                        if (exactMatch && updateCacheAndStats)
 +                        {
 +                            assert key instanceof DecoratedKey; // key can be 
== to the index key only if it's a true row key
 +                            DecoratedKey decoratedKey = (DecoratedKey)key;
 +
 +                            if (logger.isTraceEnabled())
 +                            {
 +                                // expensive sanity check!  see CASSANDRA-4687
 +                                try (FileDataInput fdi = 
dfile.getSegment(indexEntry.position))
 +                                {
 +                                    DecoratedKey keyInDisk = 
partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi));
 +                                    if (!keyInDisk.equals(key))
 +                                        throw new 
AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
 +                                }
 +                            }
 +
 +                            // store exact match for the key
 +                            cacheKey(decoratedKey, indexEntry);
 +                        }
 +                        if (op == Operator.EQ && updateCacheAndStats)
 +                            bloomFilterTracker.addTruePositive();
 +                        Tracing.trace("Partition index with {} entries found 
for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation);
 +                        return indexEntry;
 +                    }
 +
 +                    RowIndexEntry.Serializer.skip(in);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                markSuspect();
 +                throw new CorruptSSTableException(e, path);
 +            }
 +        }
 +
 +        if (op == SSTableReader.Operator.EQ && updateCacheAndStats)
 +            bloomFilterTracker.addFalsePositive();
 +        Tracing.trace("Partition index lookup complete (bloom filter false 
positive) for sstable {}", descriptor.generation);
 +        return null;
 +    }
 +
 +
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CacheService.java
index a775627,50d8903..a13a52d
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@@ -267,9 -282,9 +266,9 @@@ public class CacheService implements Ca
          keyCache.clear();
      }
  
-     public void invalidateKeyCacheForCf(UUID cfId)
+     public void invalidateKeyCacheForCf(Pair<String, String> ksAndCFName)
      {
 -        Iterator<KeyCacheKey> keyCacheIterator = 
keyCache.getKeySet().iterator();
 +        Iterator<KeyCacheKey> keyCacheIterator = keyCache.keyIterator();
          while (keyCacheIterator.hasNext())
          {
              KeyCacheKey key = keyCacheIterator.next();
@@@ -283,9 -298,9 +282,9 @@@
          rowCache.clear();
      }
  
-     public void invalidateRowCacheForCf(UUID cfId)
+     public void invalidateRowCacheForCf(Pair<String, String> ksAndCFName)
      {
 -        Iterator<RowCacheKey> rowCacheIterator = 
rowCache.getKeySet().iterator();
 +        Iterator<RowCacheKey> rowCacheIterator = rowCache.keyIterator();
          while (rowCacheIterator.hasNext())
          {
              RowCacheKey rowCacheKey = rowCacheIterator.next();
@@@ -294,9 -309,9 +293,9 @@@
          }
      }
  
-     public void invalidateCounterCacheForCf(UUID cfId)
+     public void invalidateCounterCacheForCf(Pair<String, String> ksAndCFName)
      {
 -        Iterator<CounterCacheKey> counterCacheIterator = 
counterCache.getKeySet().iterator();
 +        Iterator<CounterCacheKey> counterCacheIterator = 
counterCache.keyIterator();
          while (counterCacheIterator.hasNext())
          {
              CounterCacheKey counterCacheKey = counterCacheIterator.next();
@@@ -423,7 -487,7 +435,7 @@@
              ByteBufferUtil.writeWithLength(key.key, out);
              out.writeInt(key.desc.generation);
              out.writeBoolean(true);
-             key.desc.getFormat().getIndexSerializer(cfm).serialize(entry, 
out);
 -            
cfs.metadata.comparator.rowIndexEntrySerializer().serialize(entry, out);
++            
key.desc.getFormat().getIndexSerializer(cfs.metadata).serialize(entry, out);
          }
  
          public Future<Pair<KeyCacheKey, RowIndexEntry>> 
deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException
@@@ -436,15 -502,15 +450,15 @@@
              }
              ByteBuffer key = ByteBufferUtil.read(input, keyLength);
              int generation = input.readInt();
-             SSTableReader reader = findDesc(generation, cfs.getSSTables());
              input.readBoolean(); // backwards compatibility for "promoted 
indexes" boolean
-             if (reader == null)
+             SSTableReader reader = null;
+             if (cfs == null || !cfs.isKeyCacheEnabled() || (reader = 
findDesc(generation, cfs.getSSTables())) == null)
              {
-                 RowIndexEntry.Serializer.skipPromotedIndex(input);
+                 RowIndexEntry.Serializer.skip(input);
                  return null;
              }
 -            RowIndexEntry entry = 
reader.metadata.comparator.rowIndexEntrySerializer().deserialize(input, 
reader.descriptor.version);
 +            RowIndexEntry entry = 
reader.descriptor.getFormat().getIndexSerializer(reader.metadata).deserialize(input,
 reader.descriptor.version);
-             return Futures.immediateFuture(Pair.create(new 
KeyCacheKey(cfs.metadata.cfId, reader.descriptor, key), entry));
+             return Futures.immediateFuture(Pair.create(new 
KeyCacheKey(cfs.metadata.ksAndCFName, reader.descriptor, key), entry));
          }
  
          private SSTableReader findDesc(int generation, 
Collection<SSTableReader> collection)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index 2020201,17553f3..075c8f7
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -25,25 -25,22 +25,27 @@@ import java.net.InetAddress
  import java.net.UnknownHostException;
  import java.rmi.registry.LocateRegistry;
  import java.rmi.server.RMIServerSocketFactory;
 +import java.util.Collections;
+ import java.util.List;
 -import java.util.*;
 +import java.util.Map;
 +import java.util.UUID;
  import java.util.concurrent.TimeUnit;
+ 
  import javax.management.MBeanServer;
- import javax.management.MalformedObjectNameException;
  import javax.management.ObjectName;
  import javax.management.StandardMBean;
  import javax.management.remote.JMXConnectorServer;
  import javax.management.remote.JMXServiceURL;
  import javax.management.remote.rmi.RMIConnectorServer;
  
 +import com.codahale.metrics.Meter;
 +import com.codahale.metrics.MetricRegistryListener;
 +import com.codahale.metrics.SharedMetricRegistries;
  import com.google.common.annotations.VisibleForTesting;
--import com.google.common.collect.Iterables;
+ import com.google.common.util.concurrent.Futures;
+ import com.google.common.util.concurrent.ListenableFuture;
  import com.google.common.util.concurrent.Uninterruptibles;
 +import org.apache.cassandra.metrics.DefaultNameFactory;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -353,29 -465,6 +376,29 @@@ public class CassandraDaemo
          return setupCompleted;
      }
  
 +    private void logSystemInfo()
 +    {
 +      if (logger.isInfoEnabled())
 +      {
 +              try
 +              {
 +                  logger.info("Hostname: {}", 
InetAddress.getLocalHost().getHostName());
 +              }
 +              catch (UnknownHostException e1)
 +              {
 +                  logger.info("Could not resolve local host");
 +              }
-       
++
 +              logger.info("JVM vendor/version: {}/{}", 
System.getProperty("java.vm.name"), System.getProperty("java.version"));
 +              logger.info("Heap size: {}/{}", 
Runtime.getRuntime().totalMemory(), Runtime.getRuntime().maxMemory());
-       
++
 +              for(MemoryPoolMXBean pool: 
ManagementFactory.getMemoryPoolMXBeans())
 +                  logger.info("{} {}: {}", pool.getName(), pool.getType(), 
pool.getPeakUsage());
-       
++
 +              logger.info("Classpath: {}", 
System.getProperty("java.class.path"));
 +      }
 +    }
 +
      /**
       * Initialize the Cassandra Daemon based on the given <a
       * href="http://commons.apache.org/daemon/jsvc.html";>Commons

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 38736dc,431f163..fa370dc
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -683,15 -717,18 +683,22 @@@ public class StorageService extends Not
          }, "StorageServiceShutdownHook");
          Runtime.getRuntime().addShutdownHook(drainOnShutdown);
  
 +        replacing = DatabaseDescriptor.isReplacing();
 +
          prepareToJoin();
  
 +        // Has to be called after the host id has potentially changed in 
prepareToJoin().
-         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-             if (cfs.metadata.isCounter())
-                 cfs.initCounterCache();
+         try
+         {
+             CacheService.instance.counterCache.loadSavedAsync().get();
+         }
+         catch (Throwable t)
+         {
+             JVMStabilityInspector.inspectThrowable(t);
+             logger.warn("Error loading counter cache", t);
+         }
+ 
 +
          if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring", 
"true")))
          {
              joinTokenRing(delay);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index fe26616,63f89a4..bfcfa59
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@@ -24,17 -24,14 +24,17 @@@ package org.apache.cassandra.cache
  import java.nio.ByteBuffer;
  import java.util.ArrayList;
  import java.util.List;
--import java.util.UUID;
  
 +import org.junit.BeforeClass;
  import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.db.ArrayBackedSortedColumns;
  import org.apache.cassandra.db.ColumnFamily;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.locator.SimpleStrategy;
+ import org.apache.cassandra.utils.Pair;
  
  import com.googlecode.concurrentlinkedhashmap.Weighers;
  
@@@ -132,17 -119,16 +132,16 @@@ public class CacheProviderTes
      @Test
      public void testKeys()
      {
-         UUID cfId = UUID.randomUUID();
- 
 -        Pair<String, String> ksAndCFName = Pair.create(keyspaceName, cfName);
++        Pair<String, String> ksAndCFName = Pair.create(KEYSPACE1, 
CF_STANDARD1);
          byte[] b1 = {1, 2, 3, 4};
-         RowCacheKey key1 = new RowCacheKey(cfId, ByteBuffer.wrap(b1));
+         RowCacheKey key1 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b1));
          byte[] b2 = {1, 2, 3, 4};
-         RowCacheKey key2 = new RowCacheKey(cfId, ByteBuffer.wrap(b2));
+         RowCacheKey key2 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b2));
          assertEquals(key1, key2);
          assertEquals(key1.hashCode(), key2.hashCode());
-         
+ 
          byte[] b3 = {1, 2, 3, 5};
-         RowCacheKey key3 = new RowCacheKey(cfId, ByteBuffer.wrap(b3));
+         RowCacheKey key3 = new RowCacheKey(ksAndCFName, ByteBuffer.wrap(b3));
          assertNotSame(key1, key3);
          assertNotSame(key1.hashCode(), key3.hashCode());
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
index 0000000,0e879e9..1a60d6d
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
+++ b/test/unit/org/apache/cassandra/cql3/KeyCacheCqlTest.java
@@@ -1,0 -1,263 +1,266 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.cql3;
+ 
+ import java.util.ArrayList;
++import java.util.Iterator;
+ import java.util.List;
+ 
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import com.google.common.collect.ImmutableSet;
 -import com.yammer.metrics.Metrics;
 -import com.yammer.metrics.core.MetricName;
+ 
+ import org.apache.cassandra.cache.KeyCacheKey;
+ import org.apache.cassandra.config.Schema;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.metrics.CacheMetrics;
++import org.apache.cassandra.metrics.CassandraMetricsRegistry;
+ import org.apache.cassandra.service.CacheService;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.Pair;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertNull;
+ 
+ public class KeyCacheCqlTest extends CQLTester
+ {
+ 
+     static final String commonColumnsDef =
+     "part_key_a     int," +
+     "part_key_b     text," +
+     "clust_key_a    int," +
+     "clust_key_b    text," +
+     "clust_key_c    frozen<list<text>>," + // to make it really big
+     "col_text       text," +
+     "col_int        int," +
+     "col_long       bigint,";
+     static final String commonColumns =
+     "part_key_a," +
+     "part_key_b," +
+     "clust_key_a," +
+     "clust_key_b," +
+     "clust_key_c," + // to make it really big
+     "col_text," +
+     "col_int," +
+     "col_long";
+ 
+     @Test
+     public void test2iKeyCachePaths() throws Throwable
+     {
+         String table = createTable("CREATE TABLE %s ("
+                                    + commonColumnsDef
+                                    + "PRIMARY KEY ((part_key_a, 
part_key_b),clust_key_a,clust_key_b,clust_key_c))");
+         createIndex("CREATE INDEX some_index ON %s (col_int)");
+         insertData(table, "some_index", true);
+         clearCache();
+ 
+         CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             UntypedResultSet result = execute("SELECT part_key_a FROM %s 
WHERE col_int = ?", i);
+             assertEquals(500, result.size());
+         }
+ 
 -        long hits = metrics.hits.count();
 -        long requests = metrics.requests.count();
++        long hits = metrics.hits.getCount();
++        long requests = metrics.requests.getCount();
+         assertEquals(4900, hits);
+         assertEquals(5250, requests);
+ 
+         //
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             UntypedResultSet result = execute("SELECT part_key_a FROM %s 
WHERE col_int = ?", i);
+             // 100 part-keys * 50 clust-keys
+             // indexed on part-key % 10 = 10 index partitions
+             // (50 clust-keys  *  100-part-keys  /  10 possible index-values) 
= 500
+             assertEquals(500, result.size());
+         }
+ 
+         metrics = CacheService.instance.keyCache.getMetrics();
 -        hits = metrics.hits.count();
 -        requests = metrics.requests.count();
++        hits = metrics.hits.getCount();
++        requests = metrics.requests.getCount();
+         assertEquals(10000, hits);
+         assertEquals(10500, requests);
+ 
+         CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
+ 
+         int beforeSize = CacheService.instance.keyCache.size();
+ 
+         CacheService.instance.keyCache.clear();
+ 
+         Assert.assertEquals(0, CacheService.instance.keyCache.size());
+ 
+         // then load saved
+         CacheService.instance.keyCache.loadSaved();
+ 
+         assertEquals(beforeSize, CacheService.instance.keyCache.size());
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             UntypedResultSet result = execute("SELECT part_key_a FROM %s 
WHERE col_int = ?", i);
+             // 100 part-keys * 50 clust-keys
+             // indexed on part-key % 10 = 10 index partitions
+             // (50 clust-keys  *  100-part-keys  /  10 possible index-values) 
= 500
+             assertEquals(500, result.size());
+         }
+ 
+         //Test Schema.getColumnFamilyStoreIncludingIndexes, several null 
check paths
+         //are defensive and unreachable
+         
assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create("foo",
 "bar")));
+         
assertNull(Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(KEYSPACE,
 "bar")));
+ 
+         dropTable("DROP TABLE %s");
+ 
+         //Test loading for a dropped 2i/table
+         CacheService.instance.keyCache.clear();
+ 
+         // then load saved
+         CacheService.instance.keyCache.loadSaved();
+ 
+         assertEquals(0, CacheService.instance.keyCache.size());
+     }
+ 
+     @Test
+     public void test2iKeyCachePathsSaveKeysForDroppedTable() throws Throwable
+     {
+         String table = createTable("CREATE TABLE %s ("
+                                    + commonColumnsDef
+                                    + "PRIMARY KEY ((part_key_a, 
part_key_b),clust_key_a,clust_key_b,clust_key_c))");
+         createIndex("CREATE INDEX some_index ON %s (col_int)");
+         insertData(table, "some_index", true);
+         clearCache();
+ 
+         CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             UntypedResultSet result = execute("SELECT part_key_a FROM %s 
WHERE col_int = ?", i);
+             assertEquals(500, result.size());
+         }
+ 
 -        long hits = metrics.hits.count();
 -        long requests = metrics.requests.count();
++        long hits = metrics.hits.getCount();
++        long requests = metrics.requests.getCount();
+         assertEquals(4900, hits);
+         assertEquals(5250, requests);
+ 
+         //
+ 
+         for (int i = 0; i < 10; i++)
+         {
+             UntypedResultSet result = execute("SELECT part_key_a FROM %s 
WHERE col_int = ?", i);
+             // 100 part-keys * 50 clust-keys
+             // indexed on part-key % 10 = 10 index partitions
+             // (50 clust-keys  *  100-part-keys  /  10 possible index-values) 
= 500
+             assertEquals(500, result.size());
+         }
+ 
+         metrics = CacheService.instance.keyCache.getMetrics();
 -        hits = metrics.hits.count();
 -        requests = metrics.requests.count();
++        hits = metrics.hits.getCount();
++        requests = metrics.requests.getCount();
+         assertEquals(10000, hits);
+         assertEquals(10500, requests);
+ 
+         dropTable("DROP TABLE %s");
+ 
+         CacheService.instance.keyCache.submitWrite(Integer.MAX_VALUE).get();
+ 
+         CacheService.instance.keyCache.clear();
+ 
+         Assert.assertEquals(0, CacheService.instance.keyCache.size());
+ 
+         // then load saved
+         CacheService.instance.keyCache.loadSaved();
+ 
 -        for (KeyCacheKey key : CacheService.instance.keyCache.getKeySet())
++        Iterator<KeyCacheKey> iter = 
CacheService.instance.keyCache.keyIterator();
++        while(iter.hasNext())
+         {
++            KeyCacheKey key = iter.next();
+             Assert.assertFalse(key.ksAndCFName.left.equals("KEYSPACE"));
+             Assert.assertFalse(key.ksAndCFName.right.startsWith(table));
+         }
+     }
+ 
+     // Inserts 100 partitions split over 10 sstables (flush after 10 
partitions).
+     // Clustered tables receive 50 CQL rows per partition.
+     private void insertData(String table, String index, boolean 
withClustering) throws Throwable
+     {
+         StorageService.instance.disableAutoCompaction(KEYSPACE, table);
+         
Keyspace.open(KEYSPACE).getColumnFamilyStore(table).forceFlush().get();
+         
Keyspace.open(KEYSPACE).getColumnFamilyStore(table).truncateBlocking();
+         if (index != null)
+         {
+             StorageService.instance.disableAutoCompaction(KEYSPACE, table + 
'.' + index);
+             
Keyspace.open(KEYSPACE).getColumnFamilyStore(table).indexManager.getIndexesByNames(ImmutableSet.of(table
 + "." + index)).iterator().next().forceBlockingFlush();
+         }
+ 
+         for (int i = 0; i < 100; i++)
+         {
+             int partKeyA = i;
+             String partKeyB = Integer.toOctalString(i);
+             for (int c = 0; c < (withClustering ? 50 : 1); c++)
+             {
+                 int clustKeyA = c;
+                 String clustKeyB = Integer.toOctalString(c);
+                 List<String> clustKeyC = makeList(clustKeyB);
+                 String colText = String.valueOf(i) + '-' + String.valueOf(c);
+                 int colInt = i % 10;
+                 long colLong = c;
+                 execute("INSERT INTO %s (" + commonColumns + ") VALUES (?, ?, 
?, ?, ?, ?, ?, ?)",
+                         partKeyA, partKeyB,
+                         clustKeyA, clustKeyB, clustKeyC,
+                         colText, colInt, colLong);
+             }
+ 
+             if (i % 10 == 9)
+             {
+                 
Keyspace.open(KEYSPACE).getColumnFamilyStore(table).forceFlush().get();
+                 if (index != null)
+                     
Keyspace.open(KEYSPACE).getColumnFamilyStore(table).indexManager.getIndexesByNames(ImmutableSet.of(table
 + "." + index)).iterator().next().forceBlockingFlush();
+             }
+         }
+     }
+ 
+     private static List<String> makeList(String value)
+     {
+         List<String> list = new ArrayList<>(50);
+         for (int i = 0; i < 50; i++)
+         {
+             list.add(value + i);
+         }
+         return list;
+     }
+ 
+     private static void clearCache()
+     {
 -        for (MetricName name : 
ImmutableSet.copyOf(Metrics.defaultRegistry().allMetrics().keySet()))
++        for (String name : 
ImmutableSet.copyOf(CassandraMetricsRegistry.Metrics.getMetrics().keySet()))
+         {
 -            Metrics.defaultRegistry().removeMetric(name);
++            CassandraMetricsRegistry.Metrics.remove(name);
+         }
++
+         CacheService.instance.keyCache.clear();
+         CacheMetrics metrics = CacheService.instance.keyCache.getMetrics();
 -        Assert.assertEquals(0, metrics.entries.value().intValue());
 -        Assert.assertEquals(0L, metrics.hits.count());
 -        Assert.assertEquals(0L, metrics.requests.count());
 -        Assert.assertEquals(0L, metrics.size.value().longValue());
++        Assert.assertEquals(0, metrics.entries.getValue().intValue());
++        Assert.assertEquals(0L, metrics.hits.getCount());
++        Assert.assertEquals(0L, metrics.requests.getCount());
++        Assert.assertEquals(0L, metrics.size.getValue().longValue());
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/db/CounterCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/CounterCacheTest.java
index 71f8b20,20e067c..5b37b2c
--- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
@@@ -24,11 -23,8 +24,12 @@@ import org.junit.BeforeClass
  import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.config.KSMetaData;
 +import org.apache.cassandra.db.marshal.CounterColumnType;
 +import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.exceptions.WriteTimeoutException;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.service.CacheService;
  import org.apache.cassandra.utils.FBUtilities;
  
@@@ -62,7 -48,8 +63,8 @@@ public class CounterCacheTes
      @Test
      public void testReadWrite()
      {
 -        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
 +        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
          CacheService.instance.invalidateCounterCache();
  
          assertEquals(0, CacheService.instance.counterCache.size());
@@@ -86,7 -73,8 +88,8 @@@
      @Test
      public void testSaveLoad() throws ExecutionException, 
InterruptedException, WriteTimeoutException
      {
 -        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
 +        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
          CacheService.instance.invalidateCounterCache();
  
          ColumnFamily cells = 
ArrayBackedSortedColumns.factory.create(cfs.metadata);
@@@ -108,4 -96,69 +111,69 @@@
          assertEquals(ClockAndCount.create(1L, 1L), 
cfs.getCachedCounter(bytes(2), cellname(1)));
          assertEquals(ClockAndCount.create(1L, 2L), 
cfs.getCachedCounter(bytes(2), cellname(2)));
      }
+ 
+     @Test
+     public void testDroppedSaveLoad() throws ExecutionException, 
InterruptedException, WriteTimeoutException
+     {
 -        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
++        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+         CacheService.instance.invalidateCounterCache();
+ 
+         ColumnFamily cells = 
ArrayBackedSortedColumns.factory.create(cfs.metadata);
+         cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, 
FBUtilities.timestampMicros()));
+         cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, 
FBUtilities.timestampMicros()));
 -        new CounterMutation(new Mutation(KS, bytes(1), cells), 
ConsistencyLevel.ONE).apply();
 -        new CounterMutation(new Mutation(KS, bytes(2), cells), 
ConsistencyLevel.ONE).apply();
++        new CounterMutation(new Mutation(KEYSPACE1, bytes(1), cells), 
ConsistencyLevel.ONE).apply();
++        new CounterMutation(new Mutation(KEYSPACE1, bytes(2), cells), 
ConsistencyLevel.ONE).apply();
+ 
+         // flush the counter cache and invalidate
+         
CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
+         CacheService.instance.invalidateCounterCache();
+         assertEquals(0, CacheService.instance.counterCache.size());
+ 
 -        Keyspace ks = Schema.instance.removeKeyspaceInstance(KS);
++        Keyspace ks = Schema.instance.removeKeyspaceInstance(KEYSPACE1);
+ 
+         try
+         {
+             // load from cache and validate
+             CacheService.instance.counterCache.loadSaved();
+             assertEquals(0, CacheService.instance.counterCache.size());
+         }
+         finally
+         {
+             Schema.instance.storeKeyspaceInstance(ks);
+         }
+     }
+ 
+     @Test
+     public void testDisabledSaveLoad() throws ExecutionException, 
InterruptedException, WriteTimeoutException
+     {
 -        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
++        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+         CacheService.instance.invalidateCounterCache();
+ 
+         ColumnFamily cells = 
ArrayBackedSortedColumns.factory.create(cfs.metadata);
+         cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, 
FBUtilities.timestampMicros()));
+         cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, 
FBUtilities.timestampMicros()));
 -        new CounterMutation(new Mutation(KS, bytes(1), cells), 
ConsistencyLevel.ONE).apply();
 -        new CounterMutation(new Mutation(KS, bytes(2), cells), 
ConsistencyLevel.ONE).apply();
++        new CounterMutation(new Mutation(KEYSPACE1, bytes(1), cells), 
ConsistencyLevel.ONE).apply();
++        new CounterMutation(new Mutation(KEYSPACE1, bytes(2), cells), 
ConsistencyLevel.ONE).apply();
+ 
+         // flush the counter cache and invalidate
+         
CacheService.instance.counterCache.submitWrite(Integer.MAX_VALUE).get();
+         CacheService.instance.invalidateCounterCache();
+         assertEquals(0, CacheService.instance.counterCache.size());
+ 
+ 
+         CacheService.instance.setCounterCacheCapacityInMB(0);
+         try
+         {
+             // load from cache and validate
+             CacheService.instance.counterCache.loadSaved();
+             assertEquals(0, CacheService.instance.counterCache.size());
+         }
+         finally
+         {
+             CacheService.instance.setCounterCacheCapacityInMB(1);
+         }
+     }
+ 
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RowCacheTest.java
index a4b7514,6d4554d..5912d7c
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@@ -28,9 -27,8 +28,10 @@@ import org.junit.Test
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 +import org.apache.cassandra.cache.CachingOptions;
  import org.apache.cassandra.cache.RowCacheKey;
 +import org.apache.cassandra.config.KSMetaData;
+ import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.composites.*;
  import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.db.filter.QueryFilter;
@@@ -177,6 -158,42 +178,42 @@@ public class RowCacheTes
          rowCacheLoad(100, 50, 0);
          CacheService.instance.setRowCacheCapacityInMB(0);
      }
+ 
+     @Test
+     public void testRowCacheDropSaveLoad() throws Exception
+     {
+         CacheService.instance.setRowCacheCapacityInMB(1);
+         rowCacheLoad(100, 50, 0);
+         CacheService.instance.rowCache.submitWrite(Integer.MAX_VALUE).get();
 -        Keyspace instance = Schema.instance.removeKeyspaceInstance(KEYSPACE);
++        Keyspace instance = 
Schema.instance.removeKeyspaceInstance(KEYSPACE_CACHED);
+         try
+         {
+             CacheService.instance.rowCache.size();
+             CacheService.instance.rowCache.clear();
+             CacheService.instance.rowCache.loadSaved();
+             int after = CacheService.instance.rowCache.size();
+             assertEquals(0, after);
+         }
+         finally
+         {
+             Schema.instance.storeKeyspaceInstance(instance);
+         }
+     }
+ 
+     @Test
+     public void testRowCacheDisabled() throws Exception
+     {
+         CacheService.instance.setRowCacheCapacityInMB(1);
+         rowCacheLoad(100, 50, 0);
+         CacheService.instance.rowCache.submitWrite(Integer.MAX_VALUE).get();
+         CacheService.instance.setRowCacheCapacityInMB(0);
+         CacheService.instance.rowCache.size();
+         CacheService.instance.rowCache.clear();
+         CacheService.instance.rowCache.loadSaved();
+         int after = CacheService.instance.rowCache.size();
+         assertEquals(0, after);
+     }
+ 
      @Test
      public void testRowCacheRange()
      {
@@@ -195,8 -212,8 +232,8 @@@
  
          ByteBuffer key = ByteBufferUtil.bytes("rowcachekey");
          DecoratedKey dk = cachedStore.partitioner.decorateKey(key);
-         RowCacheKey rck = new RowCacheKey(cachedStore.metadata.cfId, dk);
+         RowCacheKey rck = new RowCacheKey(cachedStore.metadata.ksAndCFName, 
dk);
 -        Mutation mutation = new Mutation(KEYSPACE, key);
 +        Mutation mutation = new Mutation(KEYSPACE_CACHED, key);
          for (int i = 0; i < 200; i++)
              mutation.add(cf, Util.cellname(i), ByteBufferUtil.bytes("val" + 
i), System.currentTimeMillis());
          mutation.applyUnsafe();
@@@ -271,7 -288,7 +308,7 @@@
  
          // empty the cache again to make sure values came from disk
          CacheService.instance.invalidateRowCache();
 -        assert CacheService.instance.rowCache.size() == 0;
 -        assert CacheService.instance.rowCache.loadSaved() == (keysToSave == 
Integer.MAX_VALUE ? totalKeys : keysToSave);
 +        assertEquals(0, CacheService.instance.rowCache.size());
-         assertEquals(keysToSave == Integer.MAX_VALUE ? totalKeys : 
keysToSave, CacheService.instance.rowCache.loadSaved(store));
++        assertEquals(keysToSave == Integer.MAX_VALUE ? totalKeys : 
keysToSave, CacheService.instance.rowCache.loadSaved());
      }
  }

Reply via email to