Repository: cassandra
Updated Branches:
  refs/heads/trunk 7e27b55fd -> f22e775fb


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/service/FileCacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/FileCacheService.java 
b/src/java/org/apache/cassandra/service/FileCacheService.java
index d22763b..9f57995 100644
--- a/src/java/org/apache/cassandra/service/FileCacheService.java
+++ b/src/java/org/apache/cassandra/service/FileCacheService.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.cache.*;
 import org.slf4j.Logger;
@@ -41,36 +42,66 @@ public class FileCacheService
 
     public static FileCacheService instance = new FileCacheService();
 
-    private static final Callable<Queue<RandomAccessReader>> 
cacheForPathCreator = new Callable<Queue<RandomAccessReader>>()
+    private static final AtomicLong cacheKeyIdCounter = new AtomicLong();
+    public static final class CacheKey
+    {
+        final long id;
+        public CacheKey()
+        {
+            this.id = cacheKeyIdCounter.incrementAndGet();
+        }
+        public boolean equals(Object that)
+        {
+            return that instanceof CacheKey && ((CacheKey) that).id == this.id;
+        }
+        public int hashCode()
+        {
+            return (int) id;
+        }
+    }
+
+    private static final Callable<CacheBucket> cacheForPathCreator = new 
Callable<CacheBucket>()
     {
         @Override
-        public Queue<RandomAccessReader> call()
+        public CacheBucket call()
         {
-            return new ConcurrentLinkedQueue<RandomAccessReader>();
+            return new CacheBucket();
         }
     };
 
     private static final AtomicInteger memoryUsage = new AtomicInteger();
 
-    private final Cache<String, Queue<RandomAccessReader>> cache;
+    private final Cache<CacheKey, CacheBucket> cache;
     private final FileCacheMetrics metrics = new FileCacheMetrics();
 
+    private static final class CacheBucket
+    {
+        final ConcurrentLinkedQueue<RandomAccessReader> queue = new 
ConcurrentLinkedQueue<>();
+        volatile boolean discarded = false;
+    }
+
     protected FileCacheService()
     {
-        RemovalListener<String, Queue<RandomAccessReader>> onRemove = new 
RemovalListener<String, Queue<RandomAccessReader>>()
+        RemovalListener<CacheKey, CacheBucket> onRemove = new 
RemovalListener<CacheKey, CacheBucket>()
         {
             @Override
-            public void onRemoval(RemovalNotification<String, 
Queue<RandomAccessReader>> notification)
+            public void onRemoval(RemovalNotification<CacheKey, CacheBucket> 
notification)
             {
-                Queue<RandomAccessReader> cachedInstances = 
notification.getValue();
-                if (cachedInstances == null)
+                CacheBucket bucket = notification.getValue();
+                if (bucket == null)
                     return;
 
-                if (cachedInstances.size() > 0)
-                    logger.debug("Evicting cold readers for {}", 
cachedInstances.peek().getPath());
-
-                for (RandomAccessReader reader : cachedInstances)
+                // set discarded before deallocating the readers, to ensure we 
don't leak any
+                bucket.discarded = true;
+                Queue<RandomAccessReader> q = bucket.queue;
+                boolean first = true;
+                for (RandomAccessReader reader = q.poll() ; reader != null ; 
reader = q.poll())
                 {
+                    if (logger.isDebugEnabled() && first)
+                    {
+                        logger.debug("Evicting cold readers for {}", 
reader.getPath());
+                        first = false;
+                    }
                     memoryUsage.addAndGet(-1 * reader.getTotalBufferSize());
                     reader.deallocate();
                 }
@@ -81,15 +112,16 @@ public class FileCacheService
                 .expireAfterAccess(AFTER_ACCESS_EXPIRATION, 
TimeUnit.MILLISECONDS)
                 .concurrencyLevel(DatabaseDescriptor.getConcurrentReaders())
                 .removalListener(onRemove)
+                .initialCapacity(16 << 10)
                 .build();
     }
 
-    public RandomAccessReader get(String path)
+    public RandomAccessReader get(CacheKey key)
     {
         metrics.requests.mark();
 
-        Queue<RandomAccessReader> instances = getCacheFor(path);
-        RandomAccessReader result = instances.poll();
+        CacheBucket bucket = getCacheFor(key);
+        RandomAccessReader result = bucket.queue.poll();
         if (result != null)
         {
             metrics.hits.mark();
@@ -99,11 +131,11 @@ public class FileCacheService
         return result;
     }
 
-    private Queue<RandomAccessReader> getCacheFor(String path)
+    private CacheBucket getCacheFor(CacheKey key)
     {
         try
         {
-            return cache.get(path, cacheForPathCreator);
+            return cache.get(key, cacheForPathCreator);
         }
         catch (ExecutionException e)
         {
@@ -111,34 +143,46 @@ public class FileCacheService
         }
     }
 
-    public void put(RandomAccessReader instance)
+    public void put(CacheKey cacheKey, RandomAccessReader instance)
     {
         int memoryUsed = memoryUsage.get();
         if (logger.isDebugEnabled())
             logger.debug("Estimated memory usage is {} compared to actual 
usage {}", memoryUsed, sizeInBytes());
 
-        if (memoryUsed >= MEMORY_USAGE_THRESHOLD)
+        CacheBucket bucket = cache.getIfPresent(cacheKey);
+        if (memoryUsed >= MEMORY_USAGE_THRESHOLD || bucket == null)
         {
             instance.deallocate();
         }
         else
         {
             memoryUsage.addAndGet(instance.getTotalBufferSize());
-            getCacheFor(instance.getPath()).add(instance);
+            bucket.queue.add(instance);
+            if (bucket.discarded)
+            {
+                RandomAccessReader reader = bucket.queue.poll();
+                if (reader != null)
+                {
+                    memoryUsage.addAndGet(-1 * reader.getTotalBufferSize());
+                    reader.deallocate();
+                }
+            }
         }
     }
 
-    public void invalidate(String path)
+    public void invalidate(CacheKey cacheKey, String path)
     {
-        logger.debug("Invalidating cache for {}", path);
-        cache.invalidate(path);
+        if (logger.isDebugEnabled())
+            logger.debug("Invalidating cache for {}", path);
+        cache.invalidate(cacheKey);
     }
 
+    // TODO: this method is unsafe, as it calls getTotalBufferSize() on items 
that can have been discarded
     public long sizeInBytes()
     {
         long n = 0;
-        for (Queue<RandomAccessReader> queue : cache.asMap().values())
-            for (RandomAccessReader reader : queue)
+        for (CacheBucket bucket : cache.asMap().values())
+            for (RandomAccessReader reader : bucket.queue)
                 n += reader.getTotalBufferSize();
         return n;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/streaming/StreamLockfile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamLockfile.java 
b/src/java/org/apache/cassandra/streaming/StreamLockfile.java
index 0eb01c5..4d20479 100644
--- a/src/java/org/apache/cassandra/streaming/StreamLockfile.java
+++ b/src/java/org/apache/cassandra/streaming/StreamLockfile.java
@@ -21,15 +21,20 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
 
 import com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Encapsulates the behavior for 'locking' any streamed sttables to a node.
@@ -69,7 +74,7 @@ public class StreamLockfile
             /* write out the file names *without* the 'tmp-file' flag in the 
file name.
                this class will not need to clean up tmp files (on restart), 
CassandraDaemon does that already,
                just make sure we delete the fully-formed SSTRs. */
-            
sstablePaths.add(writer.descriptor.asTemporary(false).baseFilename());
+            
sstablePaths.add(writer.descriptor.asType(Descriptor.Type.FINAL).baseFilename());
         }
 
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java 
b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index e91f58f..78d4d9e 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -117,17 +117,6 @@ public class StandaloneScrubber
                             scrubber.close();
                         }
 
-                        if (manifest != null)
-                        {
-                            if (scrubber.getNewInOrderSSTable() != null)
-                                manifest.add(scrubber.getNewInOrderSSTable());
-
-                            List<SSTableReader> added = 
scrubber.getNewSSTable() == null
-                                ? Collections.<SSTableReader>emptyList()
-                                : 
Collections.singletonList(scrubber.getNewSSTable());
-                            
manifest.replace(Collections.singletonList(sstable), added);
-                        }
-
                         // Remove the sstable (it's been copied by scrub and 
snapshotted)
                         sstable.markObsolete();
                         sstable.releaseReference();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java 
b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
index 8b92586..9353ce9 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@ -140,10 +140,6 @@ public class StandaloneSplitter
                 try
                 {
                     new SSTableSplitter(cfs, sstable, 
options.sizeInMB).split();
-
-                    // Remove the sstable
-                    sstable.markObsolete();
-                    sstable.releaseReference();
                 }
                 catch (Exception e)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java 
b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
index a00245b..55f206e 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
@@ -97,9 +97,6 @@ public class StandaloneUpgrader
                 {
                     Upgrader upgrader = new Upgrader(cfs, sstable, handler);
                     upgrader.upgrade();
-
-                    sstable.markObsolete();
-                    sstable.releaseReference();
                 }
                 catch (Exception e)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/utils/CLibrary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CLibrary.java 
b/src/java/org/apache/cassandra/utils/CLibrary.java
index ac9f863..1d3c014 100644
--- a/src/java/org/apache/cassandra/utils/CLibrary.java
+++ b/src/java/org/apache/cassandra/utils/CLibrary.java
@@ -18,6 +18,9 @@
 package org.apache.cassandra.utils;
 
 import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.lang.reflect.Field;
 
 import org.slf4j.Logger;
@@ -139,6 +142,25 @@ public final class CLibrary
         }
     }
 
+    public static void trySkipCache(String path, long offset, long len)
+    {
+        trySkipCache(getfd(path), offset, len);
+    }
+
+    public static void trySkipCache(int fd, long offset, long len)
+    {
+        if (len == 0)
+            trySkipCache(fd, 0, 0);
+
+        while (len > 0)
+        {
+            int sublen = (int) Math.min(Integer.MAX_VALUE, len);
+            trySkipCache(fd, offset, sublen);
+            len -= sublen;
+            offset -= sublen;
+        }
+    }
+
     public static void trySkipCache(int fd, long offset, int len)
     {
         if (fd < 0)
@@ -280,33 +302,30 @@ public final class CLibrary
         return -1;
     }
 
-    /**
-     * Suggest kernel to preheat one page for the given file.
-     *
-     * @param fd The file descriptor of file to preheat.
-     * @param position The offset of the block.
-     *
-     * @return On success, zero is returned. On error, an error number is 
returned.
-     */
-    public static int preheatPage(int fd, long position)
+    public static int getfd(String path)
     {
+        RandomAccessFile file = null;
         try
         {
-            // 4096 is good for SSD because they operate on "Pages" 4KB in size
-            return posix_fadvise(fd, position, 4096, POSIX_FADV_WILLNEED);
+            file = new RandomAccessFile(path, "r");
+            return getfd(file.getFD());
         }
-        catch (UnsatisfiedLinkError e)
+        catch (Throwable t)
         {
-            // JNA is unavailable just skipping
+            // ignore
+            return -1;
         }
-        catch (RuntimeException e)
+        finally
         {
-            if (!(e instanceof LastErrorException))
-                throw e;
-
-            logger.warn(String.format("posix_fadvise(%d, %d) failed, errno 
(%d).", fd, position, errno(e)));
+            try
+            {
+                if (file != null)
+                    file.close();
+            }
+            catch (Throwable t)
+            {
+                // ignore
+            }
         }
-
-        return -1;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java 
b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index de8da01..3007292 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -21,7 +21,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.Memory;
 
@@ -42,7 +41,7 @@ public class OffHeapBitSet implements IBitSet
         try
         {
             long byteCount = wordCount * 8L;
-            bytes = RefCountedMemory.allocate(byteCount);
+            bytes = Memory.allocate(byteCount);
         }
         catch (OutOfMemoryError e)
         {
@@ -123,7 +122,7 @@ public class OffHeapBitSet implements IBitSet
     public static OffHeapBitSet deserialize(DataInput in) throws IOException
     {
         long byteCount = in.readInt() * 8L;
-        Memory memory = RefCountedMemory.allocate(byteCount);
+        Memory memory = Memory.allocate(byteCount);
         for (long i = 0; i < byteCount;)
         {
             long v = in.readLong();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git 
a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java 
b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index d68ba10..35c2b5e 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -99,7 +99,7 @@ public class LongCompactionsTest extends SchemaLoader
 
         long start = System.nanoTime();
         final int gcBefore = (int) (System.currentTimeMillis() / 1000) - 
Schema.instance.getCFMetaData(KEYSPACE1, "Standard1").getGcGraceSeconds();
-        new CompactionTask(store, sstables, gcBefore).execute(null);
+        new CompactionTask(store, sstables, gcBefore, false).execute(null);
         System.out.println(String.format("%s: sstables=%d rowsper=%d 
colsper=%d: %d ms",
                                          this.getClass().getName(),
                                          sstableCount,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java 
b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index b3f7429..d180b82 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -22,15 +22,26 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.Test;
@@ -41,28 +52,55 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.IndexType;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.composites.Composites;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.LexicalUUIDType;
 import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.ExcludingBounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.IncludingExcludingBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableSimpleWriter;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-import static org.junit.Assert.*;
-import static org.apache.cassandra.Util.*;
+import static org.apache.cassandra.Util.cellname;
+import static org.apache.cassandra.Util.column;
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.Util.rp;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class ColumnFamilyStoreTest extends SchemaLoader
@@ -916,8 +954,8 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         for (int version = 1; version <= 2; ++version)
         {
-            Descriptor existing = new 
Descriptor(cfs.directories.getDirectoryForCompactedSSTables(), "Keyspace2", 
"Standard1", version, false);
-            Descriptor desc = new 
Descriptor(Directories.getBackupsDirectory(existing), "Keyspace2", "Standard1", 
version, false);
+            Descriptor existing = new 
Descriptor(cfs.directories.getDirectoryForCompactedSSTables(), "Keyspace2", 
"Standard1", version, Descriptor.Type.FINAL);
+            Descriptor desc = new 
Descriptor(Directories.getBackupsDirectory(existing), "Keyspace2", "Standard1", 
version, Descriptor.Type.FINAL);
             for (Component c : new Component[]{ Component.DATA, 
Component.PRIMARY_INDEX, Component.FILTER, Component.STATS })
                 assertTrue("can not find backedup file:" + 
desc.filenameFor(c), new File(desc.filenameFor(c)).exists());
         }
@@ -1697,7 +1735,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
                 MetadataCollector collector = new 
MetadataCollector(cfmeta.comparator);
                 for (int ancestor : ancestors)
                     collector.addAncestor(ancestor);
-                String file = new Descriptor(directory, ks, cf, 3, 
true).filenameFor(Component.DATA);
+                String file = new Descriptor(directory, ks, cf, 3, 
Descriptor.Type.TEMP).filenameFor(Component.DATA);
                 return new SSTableWriter(file,
                                          0,
                                          
ActiveRepairService.UNREPAIRED_SSTABLE,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java 
b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 97cd21c..05e0beb 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -19,20 +19,25 @@ package org.apache.cassandra.db;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Config.DiskFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Directories.DataDirectory;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -99,7 +104,7 @@ public class DirectoriesTest
 
     private static void createFakeSSTable(File dir, String cf, int gen, 
boolean temp, List<File> addTo) throws IOException
     {
-        Descriptor desc = new Descriptor(dir, KS, cf, gen, temp);
+        Descriptor desc = new Descriptor(dir, KS, cf, gen, temp ? 
Descriptor.Type.TEMP : Descriptor.Type.FINAL);
         for (Component c : new Component[]{ Component.DATA, 
Component.PRIMARY_INDEX, Component.FILTER })
         {
             File f = new File(desc.filenameFor(c));
@@ -122,7 +127,7 @@ public class DirectoriesTest
             Directories directories = new Directories(cfm);
             assertEquals(cfDir(cfm), 
directories.getDirectoryForCompactedSSTables());
 
-            Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, 
false);
+            Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, 
Descriptor.Type.FINAL);
             File snapshotDir = new File(cfDir(cfm),  File.separator + 
Directories.SNAPSHOT_SUBDIR + File.separator + "42");
             assertEquals(snapshotDir, Directories.getSnapshotDirectory(desc, 
"42"));
 
@@ -224,7 +229,7 @@ public class DirectoriesTest
             final String n = Long.toString(System.nanoTime());
             Callable<File> directoryGetter = new Callable<File>() {
                 public File call() throws Exception {
-                    Descriptor desc = new Descriptor(cfDir(cfm), KS, 
cfm.cfName, 1, false);
+                    Descriptor desc = new Descriptor(cfDir(cfm), KS, 
cfm.cfName, 1, Descriptor.Type.FINAL);
                     return Directories.getSnapshotDirectory(desc, n);
                 }
             };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java 
b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index 6ca5487..c48a728 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.cassandra.db;
 
+import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import org.junit.AfterClass;
@@ -30,7 +32,9 @@ import org.apache.cassandra.cache.KeyCacheKey;
 import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.junit.Assert.assertEquals;
@@ -145,11 +149,22 @@ public class KeyCacheTest extends SchemaLoader
 
         assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
 
+        Set<SSTableReader> readers = cfs.getDataTracker().getSSTables();
+        for (SSTableReader reader : readers)
+            reader.acquireReference();
+
         Util.compactAll(cfs, Integer.MAX_VALUE).get();
-        // after compaction cache should have entries for
-        // new SSTables, if we had 2 keys in cache previously it should become 
4
+        // after compaction cache should have entries for new SSTables,
+        // but since we have kept a reference to the old sstables,
+        // if we had 2 keys in cache previously it should become 4
         assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1);
 
+        for (SSTableReader reader : readers)
+            reader.releaseReference();
+
+        // after releasing the reference this should drop to 2
+        assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
+
         // re-read same keys to verify that key cache didn't grow further
         cfs.getColumnFamily(QueryFilter.getSliceFilter(key1,
                                                        COLUMN_FAMILY1,
@@ -167,7 +182,7 @@ public class KeyCacheTest extends SchemaLoader
                                                        10,
                                                        
System.currentTimeMillis()));
 
-        assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1);
+        assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
     }
 
     private void assertKeyCacheSize(int expected, String keyspace, String 
columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java 
b/test/unit/org/apache/cassandra/db/ScrubTest.java
index b8c7980..e820fc2 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.db;
  *
  */
 
-import java.io.*;
-import java.util.Collections;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -29,7 +31,6 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.commons.lang3.StringUtils;
@@ -37,17 +38,18 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
-import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.compaction.Scrubber;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.Scrubber;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.Util.cellname;
@@ -112,7 +114,7 @@ public class ScrubTest extends SchemaLoader
         file.close();
 
         // with skipCorrupted == false, the scrub is expected to fail
-        Scrubber scrubber = new Scrubber(cfs, sstable, false);
+        Scrubber scrubber = new Scrubber(cfs, sstable, false, false);
         try
         {
             scrubber.scrub();
@@ -121,10 +123,9 @@ public class ScrubTest extends SchemaLoader
         catch (IOError err) {}
 
         // with skipCorrupted == true, the corrupt row will be skipped
-        scrubber = new Scrubber(cfs, sstable, true);
+        scrubber = new Scrubber(cfs, sstable, true, false);
         scrubber.scrub();
         scrubber.close();
-        cfs.replaceCompactedSSTables(Collections.singletonList(sstable), 
Collections.singletonList(scrubber.getNewSSTable()), OperationType.SCRUB);
         assertEquals(1, cfs.getSSTables().size());
 
         // verify that we can read all of the rows, and there is now one less 
row
@@ -206,7 +207,7 @@ public class ScrubTest extends SchemaLoader
         assert root != null;
         File rootDir = new File(root);
         assert rootDir.isDirectory();
-        Descriptor desc = new Descriptor(new Descriptor.Version("jb"), 
rootDir, KEYSPACE, columnFamily, 1, false);
+        Descriptor desc = new Descriptor(new Descriptor.Version("jb"), 
rootDir, KEYSPACE, columnFamily, 1, Descriptor.Type.FINAL);
         CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, 
desc.cfname);
 
         try
@@ -227,7 +228,7 @@ public class ScrubTest extends SchemaLoader
         components.add(Component.TOC);
         SSTableReader sstable = SSTableReader.openNoValidation(desc, 
components, metadata);
 
-        Scrubber scrubber = new Scrubber(cfs, sstable, false);
+        Scrubber scrubber = new Scrubber(cfs, sstable, false, true);
         scrubber.scrub();
 
         cfs.loadNewSSTables();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
 
b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index f8fcf76..900abd8 100644
--- 
a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ 
b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -18,19 +18,22 @@
  */
 package org.apache.cassandra.io.compress;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.Collections;
 import java.util.Random;
 
 import org.junit.Test;
 
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriter;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -61,7 +64,7 @@ public class CompressedRandomAccessReaderTest
         {
 
             MetadataCollector sstableMetadataCollector = new 
MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
-            CompressedSequentialWriter writer = new 
CompressedSequentialWriter(f, filename + ".metadata", false, new 
CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, 
String>emptyMap()), sstableMetadataCollector);
+            CompressedSequentialWriter writer = new 
CompressedSequentialWriter(f, filename + ".metadata", new 
CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, 
String>emptyMap()), sstableMetadataCollector);
 
             for (int i = 0; i < 20; i++)
                 writer.write("x".getBytes());
@@ -101,8 +104,8 @@ public class CompressedRandomAccessReaderTest
         {
             MetadataCollector sstableMetadataCollector = new 
MetadataCollector(new 
SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
             SequentialWriter writer = compressed
-                ? new CompressedSequentialWriter(f, filename + ".metadata", 
false, new CompressionParameters(SnappyCompressor.instance), 
sstableMetadataCollector)
-                : new SequentialWriter(f, 
CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
+                ? new CompressedSequentialWriter(f, filename + ".metadata", 
new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
+                : new SequentialWriter(f, 
CompressionParameters.DEFAULT_CHUNK_LENGTH);
 
             writer.write("The quick ".getBytes());
             FileMark mark = writer.mark();
@@ -151,7 +154,7 @@ public class CompressedRandomAccessReaderTest
         metadata.deleteOnExit();
 
         MetadataCollector sstableMetadataCollector = new MetadataCollector(new 
SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
-        SequentialWriter writer = new CompressedSequentialWriter(file, 
metadata.getPath(), false, new 
CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector);
+        SequentialWriter writer = new CompressedSequentialWriter(file, 
metadata.getPath(), new CompressionParameters(SnappyCompressor.instance), 
sstableMetadataCollector);
 
         writer.write(CONTENT.getBytes());
         writer.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java 
b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index e26d0f5..2dc07ec 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -79,7 +79,7 @@ public class LegacySSTableTest extends SchemaLoader
     protected Descriptor getDescriptor(String ver)
     {
         File directory = new File(LEGACY_SSTABLE_ROOT + File.separator + ver + 
File.separator + KSNAME);
-        return new Descriptor(ver, directory, KSNAME, CFNAME, 0, false);
+        return new Descriptor(ver, directory, KSNAME, CFNAME, 0, 
Descriptor.Type.FINAL);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 292a51e..19a0b13 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -60,7 +60,7 @@ public class SSTableUtils
         File keyspaceDir = new File(tempdir, keyspaceName);
         keyspaceDir.mkdir();
         keyspaceDir.deleteOnExit();
-        File datafile = new File(new Descriptor(keyspaceDir, keyspaceName, 
cfname, generation, false).filenameFor("Data.db"));
+        File datafile = new File(new Descriptor(keyspaceDir, keyspaceName, 
cfname, generation, Descriptor.Type.FINAL).filenameFor("Data.db"));
         if (!datafile.createNewFile())
             throw new IOException("unable to create file " + datafile);
         datafile.deleteOnExit();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
 
b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index da0e31a..7751a51 100644
--- 
a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.io.sstable.metadata;
 
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -28,8 +27,8 @@ import java.util.Set;
 import com.google.common.collect.Sets;
 import org.junit.Test;
 
-import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.io.sstable.Component;
@@ -76,7 +75,7 @@ public class MetadataSerializerTest
             serializer.serialize(originalMetadata, out);
         }
 
-        Descriptor desc = new Descriptor(Descriptor.Version.CURRENT, 
statsFile.getParentFile(), "", "", 0, false);
+        Descriptor desc = new Descriptor(Descriptor.Version.CURRENT, 
statsFile.getParentFile(), "", "", 0, Descriptor.Type.FINAL);
         try (RandomAccessReader in = RandomAccessReader.open(statsFile))
         {
             Map<MetadataType, MetadataComponent> deserialized = 
serializer.deserialize(desc, in, EnumSet.allOf(MetadataType.class));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java 
b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 2a8c7a9..fb45dd3 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -132,7 +132,7 @@ public class DataOutputTest
     public void testSequentialWriter() throws IOException
     {
         File file = FileUtils.createTempFile("dataoutput", "test");
-        final SequentialWriter writer = new SequentialWriter(file, 32, true);
+        final SequentialWriter writer = new SequentialWriter(file, 32);
         DataOutputStreamAndChannel write = new 
DataOutputStreamAndChannel(writer, writer);
         DataInput canon = testWrite(write);
         write.flush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
 
b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 8d9480b..dbc1ec2 100644
--- 
a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -63,7 +63,7 @@ public class CompressedInputStreamTest
         Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
         MetadataCollector collector = new MetadataCollector(new 
SimpleDenseCellNameType(BytesType.instance));
         CompressionParameters param = new 
CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
-        CompressedSequentialWriter writer = new 
CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), 
false, param, collector);
+        CompressedSequentialWriter writer = new 
CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), 
param, collector);
         Map<Long, Long> index = new HashMap<Long, Long>();
         for (long l = 0L; l < 1000; l++)
         {

Reply via email to