Updated Branches:
  refs/heads/trunk 9f867ea4c -> 2bfea99dd

Revert "merge from 0.1"

This reverts commit 9f867ea4c20b40e2bcd07fb43dc888bd3601474a, reversing
changes made to 681e2dea7679e0008cf149afc2b01f4b150d006c.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2bfea99d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2bfea99d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2bfea99d

Branch: refs/heads/trunk
Commit: 2bfea99ddb3451d5b0710cd0462980f56e776c70
Parents: 9f867ea
Author: Jonathan Ellis <[email protected]>
Authored: Wed Jun 27 11:14:09 2012 -0500
Committer: Jonathan Ellis <[email protected]>
Committed: Wed Jun 27 11:16:31 2012 -0500

----------------------------------------------------------------------
 .../apache/cassandra/cache/AutoSavingCache.java    |  144 ++++++++-------
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  128 ++++++--------
 .../unit/org/apache/cassandra/db/RowCacheTest.java |    2 +-
 3 files changed, 134 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bfea99d/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java 
b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 7eed2a0..41b8f5d 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,14 +7,13 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.cassandra.cache;
 
@@ -35,11 +34,12 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.utils.Pair;
 
@@ -53,15 +53,19 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
     protected volatile ScheduledFuture<?> saveTask;
     protected final CacheService.CacheType cacheType;
 
-    public AutoSavingCache(ICache<K, V> cache, CacheService.CacheType 
cacheType)
+    private CacheSerializer<K, V> cacheLoader;
+    private static final String CURRENT_VERSION = "b";
+
+    public AutoSavingCache(ICache<K, V> cache, CacheService.CacheType 
cacheType, CacheSerializer<K, V> cacheloader)
     {
         super(cache);
         this.cacheType = cacheType;
+        this.cacheLoader = cacheloader;
     }
 
-    public File getCachePath(String ksName, String cfName)
+    public File getCachePath(String ksName, String cfName, String version)
     {
-        return DatabaseDescriptor.getSerializedCachePath(ksName, cfName, 
cacheType);
+        return DatabaseDescriptor.getSerializedCachePath(ksName, cfName, 
cacheType, version);
     }
 
     public Writer getWriter(int keysToSave)
@@ -92,41 +96,51 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
         }
     }
 
-    public Set<DecoratedKey> readSaved(String ksName, String cfName)
+    public int loadSaved(ColumnFamilyStore cfs)
     {
-        File path = getCachePath(ksName, cfName);
-        Set<DecoratedKey> keys = new TreeSet<DecoratedKey>();
+        int count = 0;
+        long start = System.currentTimeMillis();
+        File path = getCachePath(cfs.table.name, cfs.columnFamily, null);
         if (path.exists())
         {
             DataInputStream in = null;
             try
             {
-                long start = System.currentTimeMillis();
+                logger.info(String.format("reading saved cache %s", path));
+                in = new DataInputStream(new BufferedInputStream(new 
FileInputStream(path)));
+                Set<ByteBuffer> keys = new HashSet<ByteBuffer>();
+                while (in.available() > 0)
+                {
+                    keys.add(ByteBufferUtil.readWithLength(in));
+                    count++;
+                }
+                cacheLoader.load(keys, cfs);
+            }
+            catch (Exception e)
+            {
+                logger.warn(String.format("error reading saved cache %s, keys 
loaded so far: %d", path.getAbsolutePath(), count), e);
+                return count;
+            }
+            finally
+            {
+                FileUtils.closeQuietly(in);
+            }
+        }
 
+        path = getCachePath(cfs.table.name, cfs.columnFamily, CURRENT_VERSION);
+        if (path.exists())
+        {
+            DataInputStream in = null;
+            try
+            {
                 logger.info(String.format("reading saved cache %s", path));
                 in = new DataInputStream(new BufferedInputStream(new 
FileInputStream(path)));
                 while (in.available() > 0)
                 {
-                    int size = in.readInt();
-                    byte[] bytes = new byte[size];
-                    in.readFully(bytes);
-                    ByteBuffer buffer = ByteBuffer.wrap(bytes);
-                    DecoratedKey key;
-                    try
-                    {
-                        key = 
StorageService.getPartitioner().decorateKey(buffer);
-                    }
-                    catch (Exception e)
-                    {
-                        logger.info(String.format("unable to read entry #%s 
from saved cache %s; skipping remaining entries",
-                                keys.size(), path.getAbsolutePath()), e);
-                        break;
-                    }
-                    keys.add(key);
+                    Pair<K, V> entry = cacheLoader.deserialize(in, cfs);
+                    put(entry.left, entry.right);
+                    count++;
                 }
-                if (logger.isDebugEnabled())
-                    logger.debug(String.format("completed reading (%d ms; %d 
keys) saved cache %s",
-                            System.currentTimeMillis() - start, keys.size(), 
path));
             }
             catch (Exception e)
             {
@@ -137,7 +151,10 @@ public class AutoSavingCache<K extends CacheKey, V> 
extends InstrumentingCache<K
                 FileUtils.closeQuietly(in);
             }
         }
-        return keys;
+        if (logger.isDebugEnabled())
+            logger.debug(String.format("completed reading (%d ms; %d keys) 
saved cache %s",
+                    System.currentTimeMillis() - start, count, path));
+        return count;
     }
 
     public Future<?> submitWrite(int keysToSave)
@@ -158,22 +175,11 @@ public class AutoSavingCache<K extends CacheKey, V> 
extends InstrumentingCache<K
         }
     }
 
-    public int estimateSizeToSave(Set<K> keys)
-    {
-        int bytes = 0;
-
-        for (K key : keys)
-            bytes += key.serializedSize();
-
-        return bytes;
-    }
-
     public class Writer extends CompactionInfo.Holder
     {
         private final Set<K> keys;
         private final CompactionInfo info;
-        private final long estimatedTotalBytes;
-        private long bytesWritten;
+        private long keysWritten;
 
         protected Writer(int keysToSave)
         {
@@ -182,9 +188,6 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
             else
                 keys = hotKeySet(keysToSave);
 
-            // an approximation -- the keyset can change while saving
-            estimatedTotalBytes = estimateSizeToSave(keys);
-
             OperationType type;
             if (cacheType == CacheService.CacheType.KEY_CACHE)
                 type = OperationType.KEY_CACHE_SAVE;
@@ -196,15 +199,14 @@ public class AutoSavingCache<K extends CacheKey, V> 
extends InstrumentingCache<K
             info = new CompactionInfo(new CFMetaData("system", 
cacheType.toString(), null, null, null),
                                       type,
                                       0,
-                                      estimatedTotalBytes);
+                                      keys.size(),
+                                      "keys");
         }
 
         public CompactionInfo getCompactionInfo()
         {
-            long bytesWritten = this.bytesWritten;
-            // keyset can change in size, thus totalBytes can too
-            return info.forProgress(bytesWritten,
-                                    Math.max(bytesWritten, 
estimatedTotalBytes));
+            // keyset can change in size, thus total can too
+            return info.forProgress(keysWritten, Math.max(keysWritten, 
keys.size()));
         }
 
         public void saveCache() throws IOException
@@ -212,7 +214,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
             logger.debug("Deleting old {} files.", cacheType);
             deleteOldCacheFiles();
 
-            if (keys.size() == 0 || estimatedTotalBytes == 0)
+            if (keys.size() == 0 || keys.size() == 0)
             {
                 logger.debug("Skipping {} save, cache is empty.", cacheType);
                 return;
@@ -224,7 +226,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
 
             try
             {
-                for (CacheKey key : keys)
+                for (K key : keys)
                 {
                     Pair<String, String> path = key.getPathInfo();
                     SequentialWriter writer = writers.get(path);
@@ -234,9 +236,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
                         writer = tempCacheFile(path);
                         writers.put(path, writer);
                     }
-
-                    key.write(writer.stream);
-                    bytesWritten += key.serializedSize();
+                    cacheLoader.serialize(key, writer.stream);
+                    keysWritten++;
                 }
             }
             finally
@@ -251,7 +252,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends 
InstrumentingCache<K
                 SequentialWriter writer = info.getValue();
 
                 File tmpFile = new File(writer.getPath());
-                File cacheFile = getCachePath(path.left, path.right);
+                File cacheFile = getCachePath(path.left, path.right, 
CURRENT_VERSION);
 
                 cacheFile.delete(); // ignore error if it didn't exist
                 if (!tmpFile.renameTo(cacheFile))
@@ -263,13 +264,12 @@ public class AutoSavingCache<K extends CacheKey, V> 
extends InstrumentingCache<K
 
         private SequentialWriter tempCacheFile(Pair<String, String> pathInfo) 
throws IOException
         {
-            File path = getCachePath(pathInfo.left, pathInfo.right);
+            File path = getCachePath(pathInfo.left, pathInfo.right, 
CURRENT_VERSION);
             File tmpFile = File.createTempFile(path.getName(), null, 
path.getParentFile());
 
             return SequentialWriter.open(tmpFile, true);
         }
 
-
         private void deleteOldCacheFiles()
         {
             File savedCachesDir = new 
File(DatabaseDescriptor.getSavedCachesLocation());
@@ -283,8 +283,24 @@ public class AutoSavingCache<K extends CacheKey, V> 
extends InstrumentingCache<K
                         if (!file.delete())
                             logger.warn("Failed to delete {}", 
file.getAbsolutePath());
                     }
+
+                    if (file.isFile() && 
file.getName().endsWith(CURRENT_VERSION + ".db"))
+                    {
+                        if (!file.delete())
+                            logger.warn("Failed to delete {}", 
file.getAbsolutePath());
+                    }
                 }
             }
         }
     }
+
+    public interface CacheSerializer<K extends CacheKey, V>
+    {
+        void serialize(K key, DataOutput out) throws IOException;
+
+        Pair<K, V> deserialize(DataInputStream in, ColumnFamilyStore cfs) 
throws IOException;
+
+        @Deprecated
+        void load(Set<ByteBuffer> buffer, ColumnFamilyStore cfs);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bfea99d/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 582c097..dc2459b 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.db;
 
 import java.io.File;
@@ -35,7 +34,6 @@ import com.google.common.util.concurrent.Futures;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.cache.IRowCacheEntry;
 import org.apache.cassandra.cache.RowCacheKey;
 import org.apache.cassandra.cache.RowCacheSentinel;
@@ -43,7 +41,7 @@ import 
org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
@@ -66,19 +64,17 @@ import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.utils.*;
-import org.apache.cassandra.utils.IntervalTree.Interval;
-import org.apache.cassandra.utils.IntervalTree.IntervalTree;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import static org.apache.cassandra.config.CFMetaData.Caching;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
-    private static Logger logger = 
LoggerFactory.getLogger(ColumnFamilyStore.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(ColumnFamilyStore.class);
 
     /*
      * maybeSwitchMemtable puts Memtable.getSortedContents on the writer 
executor.  When the write is complete,
-     * we turn the writer into an SSTableReader and add it to ssTables_ where 
it is available for reads.
+     * we turn the writer into an SSTableReader and add it to ssTables where 
it is available for reads.
      *
      * There are two other things that maybeSwitchMemtable does.
      * First, it puts the Memtable into memtablesPendingFlush, where it stays 
until the flush is complete
@@ -117,12 +113,12 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     private volatile int memtableSwitchCount = 0;
 
     /* This is used to generate the next index for a SSTable */
-    private AtomicInteger fileIndexGenerator = new AtomicInteger(0);
+    private final AtomicInteger fileIndexGenerator = new AtomicInteger(0);
 
     public final SecondaryIndexManager indexManager;
 
-    private LatencyTracker readStats = new LatencyTracker();
-    private LatencyTracker writeStats = new LatencyTracker();
+    private final LatencyTracker readStats = new LatencyTracker();
+    private final LatencyTracker writeStats = new LatencyTracker();
 
     // counts of sstables accessed by reads
     private final EstimatedHistogram recentSSTablesPerRead = new 
EstimatedHistogram(35);
@@ -226,12 +222,10 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
         // scan for sstables corresponding to this cf and load them
         data = new DataTracker(this);
-        Set<DecoratedKey> savedKeys = caching == Caching.NONE || caching == 
Caching.ROWS_ONLY
-                                       ? Collections.<DecoratedKey>emptySet()
-                                       : 
CacheService.instance.keyCache.readSaved(table.name, columnFamily);
-
         Directories.SSTableLister sstables = 
directories.sstableLister().skipCompacted(true).skipTemporary(true);
-        
data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), 
savedKeys, data, metadata, this.partitioner));
+        
data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), 
data, metadata, this.partitioner));
+        if (caching == Caching.ALL || caching == Caching.KEYS_ONLY)
+            CacheService.instance.keyCache.loadSaved(this);
 
         // compaction strategy should be created after the CFS has been 
prepared
         this.compactionStrategy = 
metadata.createCompactionStrategyInstance(this);
@@ -323,7 +317,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             Descriptor desc = entry.getKey();
             generations.add(desc.generation);
             if (!desc.isCompatible())
-                throw new RuntimeException(String.format("Can't open 
incompatible SSTable! Current version %s, found file: %s", 
Descriptor.CURRENT_VERSION, desc));
+                throw new RuntimeException(String.format("Can't open 
incompatible SSTable! Current version %s, found file: %s", 
Descriptor.Version.CURRENT, desc));
         }
         Collections.sort(generations);
         int value = (generations.size() > 0) ? 
(generations.get(generations.size() - 1)) : 0;
@@ -408,19 +402,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
         long start = System.currentTimeMillis();
 
-        AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = 
CacheService.instance.rowCache;
-
-        // results are sorted on read (via treeset) because there are few 
reads and many writes and reads only happen at startup
-        int cachedRowsRead = 0;
-        for (DecoratedKey key : rowCache.readSaved(table.name, columnFamily))
-        {
-            ColumnFamily data = 
getTopLevelColumns(QueryFilter.getIdentityFilter(key, new 
QueryPath(columnFamily)),
-                                                   Integer.MIN_VALUE,
-                                                   true);
-            CacheService.instance.rowCache.put(new RowCacheKey(metadata.cfId, 
key), data);
-            cachedRowsRead++;
-        }
-
+        int cachedRowsRead = CacheService.instance.rowCache.loadSaved(this);
         if (cachedRowsRead > 0)
             logger.info(String.format("completed loading (%d ms; %d keys) row 
cache for %s.%s",
                         System.currentTimeMillis() - start,
@@ -466,7 +448,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
             if (!descriptor.isCompatible())
                 throw new RuntimeException(String.format("Can't open 
incompatible SSTable! Current version %s, found file: %s",
-                                                         
Descriptor.CURRENT_VERSION,
+                                                         
Descriptor.Version.CURRENT,
                                                          descriptor));
 
             Descriptor newDescriptor = new Descriptor(descriptor.version,
@@ -481,7 +463,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             SSTableReader reader;
             try
             {
-                reader = SSTableReader.open(newDescriptor, entry.getValue(), 
Collections.<DecoratedKey>emptySet(), data, metadata, partitioner);
+                reader = SSTableReader.open(newDescriptor, entry.getValue(), 
data, metadata, partitioner);
             }
             catch (IOException e)
             {
@@ -558,7 +540,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      * When the sstable object is closed, it will be renamed to a non-temporary
      * format, so incomplete sstables can be recognized and removed on startup.
      */
-    public String getFlushPath(long estimatedSize, String version)
+    public String getFlushPath(long estimatedSize, Descriptor.Version version)
     {
         File location = directories.getDirectoryForNewSSTables(estimatedSize);
         if (location == null)
@@ -566,7 +548,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         return getTempSSTablePath(location, version);
     }
 
-    public String getTempSSTablePath(File directory, String version)
+    public String getTempSSTablePath(File directory, Descriptor.Version 
version)
     {
         Descriptor desc = new Descriptor(version,
                                          directory,
@@ -579,7 +561,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public String getTempSSTablePath(File directory)
     {
-        return getTempSSTablePath(directory, Descriptor.CURRENT_VERSION);
+        return getTempSSTablePath(directory, Descriptor.Version.CURRENT);
     }
 
     /** flush the given memtable and swap in a new one for its CFS, if it 
hasn't been frozen already.  threadsafe. */
@@ -759,11 +741,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public static ColumnFamily removeDeletedCF(ColumnFamily cf, int gcBefore)
     {
-        if (cf.getColumnCount() == 0 && (!cf.isMarkedForDelete() || 
cf.getLocalDeletionTime() < gcBefore))
-            return null;
-
         cf.maybeResetDeletionTimes(gcBefore);
-        return cf;
+        return cf.getColumnCount() == 0 && !cf.isMarkedForDelete() ? null : cf;
     }
 
     /*
@@ -799,10 +778,9 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             IColumn c = iter.next();
             ByteBuffer cname = c.name();
             // remove columns if
-            // (a) the column itself is tombstoned or
-            // (b) the CF is tombstoned and the column is not newer than it
-            if (c.getLocalDeletionTime() < gcBefore
-                || c.timestamp() <= cf.getMarkedForDeleteAt())
+            // (a) the column itself is gcable or
+            // (b) the column is shadowed by a CF tombstone
+            if (c.getLocalDeletionTime() < gcBefore || 
cf.deletionInfo().isDeleted(c))
             {
                 iter.remove();
             }
@@ -818,28 +796,26 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         while (iter.hasNext())
         {
             SuperColumn c = (SuperColumn)iter.next();
-            long minTimestamp = Math.max(c.getMarkedForDeleteAt(), 
cf.getMarkedForDeleteAt());
             Iterator<IColumn> subIter = c.getSubColumns().iterator();
             while (subIter.hasNext())
             {
                 IColumn subColumn = subIter.next();
                 // remove subcolumns if
-                // (a) the subcolumn itself is tombstoned or
-                // (b) the supercolumn is tombstoned and the subcolumn is not 
newer than it
-                if (subColumn.timestamp() <= minTimestamp
-                    || subColumn.getLocalDeletionTime() < gcBefore)
+                // (a) the subcolumn itself is gcable or
+                // (b) the supercolumn is shadowed by the CF and the column is 
not newer
+                // (b) the subcolumn is shadowed by the supercolumn
+                if (subColumn.getLocalDeletionTime() < gcBefore
+                    || cf.deletionInfo().isDeleted(c.name(), 
subColumn.timestamp())
+                    || c.deletionInfo().isDeleted(subColumn))
                 {
                     subIter.remove();
                 }
             }
-            if (c.getSubColumns().isEmpty() && (!c.isMarkedForDelete() || 
c.getLocalDeletionTime() < gcBefore))
+            c.maybeResetDeletionTimes(gcBefore);
+            if (c.getSubColumns().isEmpty() && !c.isMarkedForDelete())
             {
                 iter.remove();
             }
-            else
-            {
-                c.maybeResetDeletionTimes(gcBefore);
-            }
         }
     }
 
@@ -855,12 +831,12 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         if (sstables.isEmpty())
             return ImmutableSet.of();
 
-        IntervalTree<SSTableReader> tree = data.getView().intervalTree;
+        DataTracker.SSTableIntervalTree tree = data.getView().intervalTree;
 
         Set<SSTableReader> results = null;
         for (SSTableReader sstable : sstables)
         {
-            Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(new 
Interval<SSTableReader>(sstable.first, sstable.last)));
+            Set<SSTableReader> overlaps = 
ImmutableSet.copyOf(tree.search(Interval.<RowPosition, 
SSTableReader>create(sstable.first, sstable.last)));
             assert overlaps.contains(sstable);
             results = results == null ? overlaps : Sets.union(results, 
overlaps);
         }
@@ -1131,7 +1107,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      * @return the entire row for filter.key, if present in the cache (or we 
can cache it), or just the column
      *         specified by filter otherwise
      */
-    private ColumnFamily getThroughCache(Integer cfId, QueryFilter filter)
+    private ColumnFamily getThroughCache(UUID cfId, QueryFilter filter)
     {
         assert isRowCacheEnabled()
                : String.format("Row cache is not enabled on column family [" + 
getColumnFamilyName() + "]");
@@ -1190,7 +1166,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                 return cf.isSuper() ? removeDeleted(cf, gcBefore) : 
removeDeletedCF(cf, gcBefore);
             }
 
-            Integer cfId = Schema.instance.getId(table.name, 
this.columnFamily);
+            UUID cfId = Schema.instance.getId(table.name, this.columnFamily);
             if (cfId == null)
                 return null; // secondary index
 
@@ -1214,8 +1190,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter, 
int gcBefore)
     {
         ColumnFamily cf = 
cached.cloneMeShallow(ArrayBackedSortedColumns.factory(), 
filter.filter.isReversed());
-        IColumnIterator ci = filter.getMemtableColumnIterator(cached, null);
-        filter.collateColumns(cf, Collections.singletonList(ci), gcBefore);
+        OnDiskAtomIterator ci = filter.getMemtableColumnIterator(cached, null);
+        filter.collateOnDiskAtom(cf, Collections.singletonList(ci), gcBefore);
         // TODO this is necessary because when we collate supercolumns 
together, we don't check
         // their subcolumns for relevance, so we need to do a second prune 
post facto here.
         return cf.isSuper() ? removeDeleted(cf, gcBefore) : 
removeDeletedCF(cf, gcBefore);
@@ -1265,7 +1241,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         while (true)
         {
             view = data.getView();
-            sstables = view.intervalTree.search(new Interval(key, key));
+            sstables = view.intervalTree.search(key);
             if (SSTableReader.acquireReferences(sstables))
                 break;
             // retry w/ new view
@@ -1285,9 +1261,17 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         {
             view = data.getView();
             // startAt == minimum is ok, but stopAt == minimum is confusing 
because all IntervalTree deals with
-            // is Comparable, so it won't know to special-case that.
-            Comparable stopInTree = stopAt.isMinimum() ? 
view.intervalTree.max() : stopAt;
-            sstables = view.intervalTree.search(new Interval(startWith, 
stopInTree));
+            // is Comparable, so it won't know to special-case that. However 
max() should not be call if the
+            // intervalTree is empty sochecking that first
+            //
+            if (view.intervalTree.isEmpty())
+            {
+                sstables = Collections.<SSTableReader>emptyList();
+                break;
+            }
+
+            RowPosition stopInTree = stopAt.isMinimum() ? 
view.intervalTree.max() : stopAt;
+            sstables = view.intervalTree.search(Interval.<RowPosition, 
SSTableReader>create(startWith, stopInTree));
             if (SSTableReader.acquireReferences(sstables))
                 break;
             // retry w/ new view
@@ -1305,7 +1289,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             for (SSTableReader sstr : view.sstables)
             {
                 // check if the key actually exists in this sstable, without 
updating cache and stats
-                if (sstr.getPosition(dk, SSTableReader.Operator.EQ, false) > 
-1)
+                if (sstr.getPosition(dk, SSTableReader.Operator.EQ, false) != 
null)
                     files.add(sstr.getFilename());
             }
             return files;
@@ -1315,7 +1299,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
     }
 
-    private ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore, 
boolean forCache)
+    public ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore, 
boolean forCache)
     {
         CollationController controller = new CollationController(this, 
forCache, filter, gcBefore);
         ColumnFamily columns = controller.getTopLevelColumns();
@@ -1607,7 +1591,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public void invalidateCachedRow(DecoratedKey key)
     {
-        Integer cfId = Schema.instance.getId(table.name, this.columnFamily);
+        UUID cfId = Schema.instance.getId(table.name, this.columnFamily);
         if (cfId == null)
             return; // secondary index
 
@@ -1637,10 +1621,10 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         return cfses;
     }
 
-    public Iterable<DecoratedKey<?>> keySamples(Range<Token> range)
+    public Iterable<DecoratedKey> keySamples(Range<Token> range)
     {
         Collection<SSTableReader> sstables = getSSTables();
-        Iterable<DecoratedKey<?>>[] samples = new Iterable[sstables.size()];
+        Iterable<DecoratedKey>[] samples = new Iterable[sstables.size()];
         int i = 0;
         for (SSTableReader sstable: sstables)
         {
@@ -1903,12 +1887,6 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         return partitioner instanceof LocalPartitioner;
     }
 
-    private String getParentColumnfamily()
-    {
-        assert isIndex();
-        return columnFamily.split("\\.")[0];
-    }
-
     private ByteBuffer intern(ByteBuffer name)
     {
         ByteBuffer internedName = internedNames.get(name);
@@ -1941,7 +1919,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     public SSTableWriter createFlushWriter(long estimatedRows, long 
estimatedSize, ReplayPosition context) throws IOException
     {
         SSTableMetadata.Collector sstableMetadataCollector = 
SSTableMetadata.createCollector().replayPosition(context);
-        return new SSTableWriter(getFlushPath(estimatedSize, 
Descriptor.CURRENT_VERSION),
+        return new SSTableWriter(getFlushPath(estimatedSize, 
Descriptor.Version.CURRENT),
                                  estimatedRows,
                                  metadata,
                                  partitioner,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bfea99d/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java 
b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index e6b4578..99b8cbc 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -153,6 +153,6 @@ public class RowCacheTest extends SchemaLoader
         // empty the cache again to make sure values came from disk
         CacheService.instance.invalidateRowCache();
         assert CacheService.instance.rowCache.size() == 0;
-        assert CacheService.instance.rowCache.readSaved(KEYSPACE, 
COLUMN_FAMILY).size() == (keysToSave == Integer.MAX_VALUE ? totalKeys : 
keysToSave);
+        assert CacheService.instance.rowCache.loadSaved(store) == (keysToSave 
== Integer.MAX_VALUE ? totalKeys : keysToSave);
     }
 }

Reply via email to