Repository: cassandra Updated Branches: refs/heads/trunk 7e27b55fd -> f22e775fb
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/service/FileCacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/FileCacheService.java b/src/java/org/apache/cassandra/service/FileCacheService.java index d22763b..9f57995 100644 --- a/src/java/org/apache/cassandra/service/FileCacheService.java +++ b/src/java/org/apache/cassandra/service/FileCacheService.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import com.google.common.cache.*; import org.slf4j.Logger; @@ -41,36 +42,66 @@ public class FileCacheService public static FileCacheService instance = new FileCacheService(); - private static final Callable<Queue<RandomAccessReader>> cacheForPathCreator = new Callable<Queue<RandomAccessReader>>() + private static final AtomicLong cacheKeyIdCounter = new AtomicLong(); + public static final class CacheKey + { + final long id; + public CacheKey() + { + this.id = cacheKeyIdCounter.incrementAndGet(); + } + public boolean equals(Object that) + { + return that instanceof CacheKey && ((CacheKey) that).id == this.id; + } + public int hashCode() + { + return (int) id; + } + } + + private static final Callable<CacheBucket> cacheForPathCreator = new Callable<CacheBucket>() { @Override - public Queue<RandomAccessReader> call() + public CacheBucket call() { - return new ConcurrentLinkedQueue<RandomAccessReader>(); + return new CacheBucket(); } }; private static final AtomicInteger memoryUsage = new AtomicInteger(); - private final Cache<String, Queue<RandomAccessReader>> cache; + private final Cache<CacheKey, CacheBucket> cache; private final FileCacheMetrics metrics = new FileCacheMetrics(); + private static final class CacheBucket + { + final ConcurrentLinkedQueue<RandomAccessReader> queue = new ConcurrentLinkedQueue<>(); + volatile boolean discarded = false; + } + protected FileCacheService() { - RemovalListener<String, Queue<RandomAccessReader>> onRemove = new RemovalListener<String, Queue<RandomAccessReader>>() + RemovalListener<CacheKey, CacheBucket> onRemove = new RemovalListener<CacheKey, CacheBucket>() { @Override - public void onRemoval(RemovalNotification<String, Queue<RandomAccessReader>> notification) + public void onRemoval(RemovalNotification<CacheKey, CacheBucket> notification) { - Queue<RandomAccessReader> cachedInstances = notification.getValue(); - if (cachedInstances == null) + CacheBucket bucket = notification.getValue(); + if (bucket == null) return; - if (cachedInstances.size() > 0) - logger.debug("Evicting cold readers for {}", cachedInstances.peek().getPath()); - - for (RandomAccessReader reader : cachedInstances) + // set discarded before deallocating the readers, to ensure we don't leak any + bucket.discarded = true; + Queue<RandomAccessReader> q = bucket.queue; + boolean first = true; + for (RandomAccessReader reader = q.poll() ; reader != null ; reader = q.poll()) { + if (logger.isDebugEnabled() && first) + { + logger.debug("Evicting cold readers for {}", reader.getPath()); + first = false; + } memoryUsage.addAndGet(-1 * reader.getTotalBufferSize()); reader.deallocate(); } @@ -81,15 +112,16 @@ public class FileCacheService .expireAfterAccess(AFTER_ACCESS_EXPIRATION, TimeUnit.MILLISECONDS) .concurrencyLevel(DatabaseDescriptor.getConcurrentReaders()) .removalListener(onRemove) + .initialCapacity(16 << 10) .build(); } - public RandomAccessReader get(String path) + public RandomAccessReader get(CacheKey key) { metrics.requests.mark(); - Queue<RandomAccessReader> instances = getCacheFor(path); - RandomAccessReader result = instances.poll(); + CacheBucket bucket = getCacheFor(key); + RandomAccessReader result = bucket.queue.poll(); if (result != null) { metrics.hits.mark(); @@ -99,11 +131,11 @@ public class FileCacheService return result; } - private Queue<RandomAccessReader> getCacheFor(String path) + private CacheBucket getCacheFor(CacheKey key) { try { - return cache.get(path, cacheForPathCreator); + return cache.get(key, cacheForPathCreator); } catch (ExecutionException e) { @@ -111,34 +143,46 @@ public class FileCacheService } } - public void put(RandomAccessReader instance) + public void put(CacheKey cacheKey, RandomAccessReader instance) { int memoryUsed = memoryUsage.get(); if (logger.isDebugEnabled()) logger.debug("Estimated memory usage is {} compared to actual usage {}", memoryUsed, sizeInBytes()); - if (memoryUsed >= MEMORY_USAGE_THRESHOLD) + CacheBucket bucket = cache.getIfPresent(cacheKey); + if (memoryUsed >= MEMORY_USAGE_THRESHOLD || bucket == null) { instance.deallocate(); } else { memoryUsage.addAndGet(instance.getTotalBufferSize()); - getCacheFor(instance.getPath()).add(instance); + bucket.queue.add(instance); + if (bucket.discarded) + { + RandomAccessReader reader = bucket.queue.poll(); + if (reader != null) + { + memoryUsage.addAndGet(-1 * reader.getTotalBufferSize()); + reader.deallocate(); + } + } } } - public void invalidate(String path) + public void invalidate(CacheKey cacheKey, String path) { - logger.debug("Invalidating cache for {}", path); - cache.invalidate(path); + if (logger.isDebugEnabled()) + logger.debug("Invalidating cache for {}", path); + cache.invalidate(cacheKey); } + // TODO: this method is unsafe, as it calls getTotalBufferSize() on items that can have been discarded public long sizeInBytes() { long n = 0; - for (Queue<RandomAccessReader> queue : cache.asMap().values()) - for (RandomAccessReader reader : queue) + for (CacheBucket bucket : cache.asMap().values()) + for (RandomAccessReader reader : bucket.queue) n += reader.getTotalBufferSize(); return n; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/streaming/StreamLockfile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamLockfile.java b/src/java/org/apache/cassandra/streaming/StreamLockfile.java index 0eb01c5..4d20479 100644 --- a/src/java/org/apache/cassandra/streaming/StreamLockfile.java +++ b/src/java/org/apache/cassandra/streaming/StreamLockfile.java @@ -21,15 +21,20 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.StandardOpenOption; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.UUID; import com.google.common.base.Charsets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.io.util.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Encapsulates the behavior for 'locking' any streamed sttables to a node. @@ -69,7 +74,7 @@ public class StreamLockfile /* write out the file names *without* the 'tmp-file' flag in the file name. this class will not need to clean up tmp files (on restart), CassandraDaemon does that already, just make sure we delete the fully-formed SSTRs. */ - sstablePaths.add(writer.descriptor.asTemporary(false).baseFilename()); + sstablePaths.add(writer.descriptor.asType(Descriptor.Type.FINAL).baseFilename()); } try http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/tools/StandaloneScrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java index e91f58f..78d4d9e 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java +++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java @@ -117,17 +117,6 @@ public class StandaloneScrubber scrubber.close(); } - if (manifest != null) - { - if (scrubber.getNewInOrderSSTable() != null) - manifest.add(scrubber.getNewInOrderSSTable()); - - List<SSTableReader> added = scrubber.getNewSSTable() == null - ? Collections.<SSTableReader>emptyList() - : Collections.singletonList(scrubber.getNewSSTable()); - manifest.replace(Collections.singletonList(sstable), added); - } - // Remove the sstable (it's been copied by scrub and snapshotted) sstable.markObsolete(); sstable.releaseReference(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/tools/StandaloneSplitter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java index 8b92586..9353ce9 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java +++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java @@ -140,10 +140,6 @@ public class StandaloneSplitter try { new SSTableSplitter(cfs, sstable, options.sizeInMB).split(); - - // Remove the sstable - sstable.markObsolete(); - sstable.releaseReference(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java index a00245b..55f206e 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java +++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java @@ -97,9 +97,6 @@ public class StandaloneUpgrader { Upgrader upgrader = new Upgrader(cfs, sstable, handler); upgrader.upgrade(); - - sstable.markObsolete(); - sstable.releaseReference(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/utils/CLibrary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/CLibrary.java b/src/java/org/apache/cassandra/utils/CLibrary.java index ac9f863..1d3c014 100644 --- a/src/java/org/apache/cassandra/utils/CLibrary.java +++ b/src/java/org/apache/cassandra/utils/CLibrary.java @@ -18,6 +18,9 @@ package org.apache.cassandra.utils; import java.io.FileDescriptor; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; import java.lang.reflect.Field; import org.slf4j.Logger; @@ -139,6 +142,25 @@ public final class CLibrary } } + public static void trySkipCache(String path, long offset, long len) + { + trySkipCache(getfd(path), offset, len); + } + + public static void trySkipCache(int fd, long offset, long len) + { + if (len == 0) + trySkipCache(fd, 0, 0); + + while (len > 0) + { + int sublen = (int) Math.min(Integer.MAX_VALUE, len); + trySkipCache(fd, offset, sublen); + len -= sublen; + offset -= sublen; + } + } + public static void trySkipCache(int fd, long offset, int len) { if (fd < 0) @@ -280,33 +302,30 @@ public final class CLibrary return -1; } - /** - * Suggest kernel to preheat one page for the given file. - * - * @param fd The file descriptor of file to preheat. - * @param position The offset of the block. - * - * @return On success, zero is returned. On error, an error number is returned. - */ - public static int preheatPage(int fd, long position) + public static int getfd(String path) { + RandomAccessFile file = null; try { - // 4096 is good for SSD because they operate on "Pages" 4KB in size - return posix_fadvise(fd, position, 4096, POSIX_FADV_WILLNEED); + file = new RandomAccessFile(path, "r"); + return getfd(file.getFD()); } - catch (UnsatisfiedLinkError e) + catch (Throwable t) { - // JNA is unavailable just skipping + // ignore + return -1; } - catch (RuntimeException e) + finally { - if (!(e instanceof LastErrorException)) - throw e; - - logger.warn(String.format("posix_fadvise(%d, %d) failed, errno (%d).", fd, position, errno(e))); + try + { + if (file != null) + file.close(); + } + catch (Throwable t) + { + // ignore + } } - - return -1; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java index de8da01..3007292 100644 --- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java +++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java @@ -21,7 +21,6 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.cassandra.cache.RefCountedMemory; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.Memory; @@ -42,7 +41,7 @@ public class OffHeapBitSet implements IBitSet try { long byteCount = wordCount * 8L; - bytes = RefCountedMemory.allocate(byteCount); + bytes = Memory.allocate(byteCount); } catch (OutOfMemoryError e) { @@ -123,7 +122,7 @@ public class OffHeapBitSet implements IBitSet public static OffHeapBitSet deserialize(DataInput in) throws IOException { long byteCount = in.readInt() * 8L; - Memory memory = RefCountedMemory.allocate(byteCount); + Memory memory = Memory.allocate(byteCount); for (long i = 0; i < byteCount;) { long v = in.readLong(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java index d68ba10..35c2b5e 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java @@ -99,7 +99,7 @@ public class LongCompactionsTest extends SchemaLoader long start = System.nanoTime(); final int gcBefore = (int) (System.currentTimeMillis() / 1000) - Schema.instance.getCFMetaData(KEYSPACE1, "Standard1").getGcGraceSeconds(); - new CompactionTask(store, sstables, gcBefore).execute(null); + new CompactionTask(store, sstables, gcBefore, false).execute(null); System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms", this.getClass().getName(), sstableCount, http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index b3f7429..d180b82 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -22,15 +22,26 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.io.util.FileUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.junit.Test; @@ -41,28 +52,55 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.IndexType; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.composites.*; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; -import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.composites.CellNameType; +import org.apache.cassandra.db.composites.CellNames; +import org.apache.cassandra.db.composites.Composites; +import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.db.filter.IDiskAtomFilter; +import org.apache.cassandra.db.filter.NamesQueryFilter; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.filter.SliceQueryFilter; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.marshal.LexicalUUIDType; import org.apache.cassandra.db.marshal.LongType; -import org.apache.cassandra.dht.*; -import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.ExcludingBounds; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.IncludingExcludingBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableDeletingTask; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.SSTableSimpleWriter; +import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.thrift.*; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.SliceRange; +import org.apache.cassandra.thrift.ThriftValidation; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; -import static org.junit.Assert.*; -import static org.apache.cassandra.Util.*; +import static org.apache.cassandra.Util.cellname; +import static org.apache.cassandra.Util.column; +import static org.apache.cassandra.Util.dk; +import static org.apache.cassandra.Util.rp; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; @RunWith(OrderedJUnit4ClassRunner.class) public class ColumnFamilyStoreTest extends SchemaLoader @@ -916,8 +954,8 @@ public class ColumnFamilyStoreTest extends SchemaLoader for (int version = 1; version <= 2; ++version) { - Descriptor existing = new Descriptor(cfs.directories.getDirectoryForCompactedSSTables(), "Keyspace2", "Standard1", version, false); - Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), "Keyspace2", "Standard1", version, false); + Descriptor existing = new Descriptor(cfs.directories.getDirectoryForCompactedSSTables(), "Keyspace2", "Standard1", version, Descriptor.Type.FINAL); + Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), "Keyspace2", "Standard1", version, Descriptor.Type.FINAL); for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.STATS }) assertTrue("can not find backedup file:" + desc.filenameFor(c), new File(desc.filenameFor(c)).exists()); } @@ -1697,7 +1735,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader MetadataCollector collector = new MetadataCollector(cfmeta.comparator); for (int ancestor : ancestors) collector.addAncestor(ancestor); - String file = new Descriptor(directory, ks, cf, 3, true).filenameFor(Component.DATA); + String file = new Descriptor(directory, ks, cf, 3, Descriptor.Type.TEMP).filenameFor(Component.DATA); return new SSTableWriter(file, 0, ActiveRepairService.UNREPAIRED_SSTABLE, http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/DirectoriesTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java index 97cd21c..05e0beb 100644 --- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java +++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java @@ -19,20 +19,25 @@ package org.apache.cassandra.db; import java.io.File; import java.io.IOException; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Config.DiskFailurePolicy; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Directories.DataDirectory; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; @@ -99,7 +104,7 @@ public class DirectoriesTest private static void createFakeSSTable(File dir, String cf, int gen, boolean temp, List<File> addTo) throws IOException { - Descriptor desc = new Descriptor(dir, KS, cf, gen, temp); + Descriptor desc = new Descriptor(dir, KS, cf, gen, temp ? Descriptor.Type.TEMP : Descriptor.Type.FINAL); for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER }) { File f = new File(desc.filenameFor(c)); @@ -122,7 +127,7 @@ public class DirectoriesTest Directories directories = new Directories(cfm); assertEquals(cfDir(cfm), directories.getDirectoryForCompactedSSTables()); - Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, false); + Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, Descriptor.Type.FINAL); File snapshotDir = new File(cfDir(cfm), File.separator + Directories.SNAPSHOT_SUBDIR + File.separator + "42"); assertEquals(snapshotDir, Directories.getSnapshotDirectory(desc, "42")); @@ -224,7 +229,7 @@ public class DirectoriesTest final String n = Long.toString(System.nanoTime()); Callable<File> directoryGetter = new Callable<File>() { public File call() throws Exception { - Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, false); + Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, Descriptor.Type.FINAL); return Directories.getSnapshotDirectory(desc, n); } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/KeyCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java index 6ca5487..c48a728 100644 --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@ -17,8 +17,10 @@ */ package org.apache.cassandra.db; +import java.nio.file.Files; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import org.junit.AfterClass; @@ -30,7 +32,9 @@ import org.apache.cassandra.cache.KeyCacheKey; import org.apache.cassandra.db.composites.*; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.service.CacheService; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import static org.junit.Assert.assertEquals; @@ -145,11 +149,22 @@ public class KeyCacheTest extends SchemaLoader assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1); + Set<SSTableReader> readers = cfs.getDataTracker().getSSTables(); + for (SSTableReader reader : readers) + reader.acquireReference(); + Util.compactAll(cfs, Integer.MAX_VALUE).get(); - // after compaction cache should have entries for - // new SSTables, if we had 2 keys in cache previously it should become 4 + // after compaction cache should have entries for new SSTables, + // but since we have kept a reference to the old sstables, + // if we had 2 keys in cache previously it should become 4 assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1); + for (SSTableReader reader : readers) + reader.releaseReference(); + + // after releasing the reference this should drop to 2 + assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1); + // re-read same keys to verify that key cache didn't grow further cfs.getColumnFamily(QueryFilter.getSliceFilter(key1, COLUMN_FAMILY1, @@ -167,7 +182,7 @@ public class KeyCacheTest extends SchemaLoader 10, System.currentTimeMillis())); - assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1); + assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1); } private void assertKeyCacheSize(int expected, String keyspace, String columnFamily) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java index b8c7980..e820fc2 100644 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ b/test/unit/org/apache/cassandra/db/ScrubTest.java @@ -20,8 +20,10 @@ package org.apache.cassandra.db; * */ -import java.io.*; -import java.util.Collections; +import java.io.File; +import java.io.IOError; +import java.io.IOException; +import java.io.RandomAccessFile; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -29,7 +31,6 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.utils.UUIDGen; import org.apache.commons.lang3.StringUtils; @@ -37,17 +38,18 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.apache.cassandra.OrderedJUnit4ClassRunner; -import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.compaction.Scrubber; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.Scrubber; import org.apache.cassandra.exceptions.WriteTimeoutException; -import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.Util.cellname; @@ -112,7 +114,7 @@ public class ScrubTest extends SchemaLoader file.close(); // with skipCorrupted == false, the scrub is expected to fail - Scrubber scrubber = new Scrubber(cfs, sstable, false); + Scrubber scrubber = new Scrubber(cfs, sstable, false, false); try { scrubber.scrub(); @@ -121,10 +123,9 @@ public class ScrubTest extends SchemaLoader catch (IOError err) {} // with skipCorrupted == true, the corrupt row will be skipped - scrubber = new Scrubber(cfs, sstable, true); + scrubber = new Scrubber(cfs, sstable, true, false); scrubber.scrub(); scrubber.close(); - cfs.replaceCompactedSSTables(Collections.singletonList(sstable), Collections.singletonList(scrubber.getNewSSTable()), OperationType.SCRUB); assertEquals(1, cfs.getSSTables().size()); // verify that we can read all of the rows, and there is now one less row @@ -206,7 +207,7 @@ public class ScrubTest extends SchemaLoader assert root != null; File rootDir = new File(root); assert rootDir.isDirectory(); - Descriptor desc = new Descriptor(new Descriptor.Version("jb"), rootDir, KEYSPACE, columnFamily, 1, false); + Descriptor desc = new Descriptor(new Descriptor.Version("jb"), rootDir, KEYSPACE, columnFamily, 1, Descriptor.Type.FINAL); CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname); try @@ -227,7 +228,7 @@ public class ScrubTest extends SchemaLoader components.add(Component.TOC); SSTableReader sstable = SSTableReader.openNoValidation(desc, components, metadata); - Scrubber scrubber = new Scrubber(cfs, sstable, false); + Scrubber scrubber = new Scrubber(cfs, sstable, false, true); scrubber.scrub(); cfs.loadNewSSTables(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java index f8fcf76..900abd8 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java @@ -18,19 +18,22 @@ */ package org.apache.cassandra.io.compress; -import java.io.*; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; import java.util.Collections; import java.util.Random; import org.junit.Test; -import org.apache.cassandra.db.composites.*; +import org.apache.cassandra.db.composites.SimpleDenseCellNameType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; -import org.apache.cassandra.io.sstable.metadata.StatsMetadata; -import org.apache.cassandra.io.util.*; +import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.io.util.SequentialWriter; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -61,7 +64,7 @@ public class CompressedRandomAccessReaderTest { MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)); - CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap()), sstableMetadataCollector); + CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap()), sstableMetadataCollector); for (int i = 0; i < 20; i++) writer.write("x".getBytes()); @@ -101,8 +104,8 @@ public class CompressedRandomAccessReaderTest { MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null); SequentialWriter writer = compressed - ? new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector) - : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false); + ? new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector) + : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH); writer.write("The quick ".getBytes()); FileMark mark = writer.mark(); @@ -151,7 +154,7 @@ public class CompressedRandomAccessReaderTest metadata.deleteOnExit(); MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null); - SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector); + SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector); writer.write(CONTENT.getBytes()); writer.close(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java index e26d0f5..2dc07ec 100644 --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java @@ -79,7 +79,7 @@ public class LegacySSTableTest extends SchemaLoader protected Descriptor getDescriptor(String ver) { File directory = new File(LEGACY_SSTABLE_ROOT + File.separator + ver + File.separator + KSNAME); - return new Descriptor(ver, directory, KSNAME, CFNAME, 0, false); + return new Descriptor(ver, directory, KSNAME, CFNAME, 0, Descriptor.Type.FINAL); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java index 292a51e..19a0b13 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java @@ -60,7 +60,7 @@ public class SSTableUtils File keyspaceDir = new File(tempdir, keyspaceName); keyspaceDir.mkdir(); keyspaceDir.deleteOnExit(); - File datafile = new File(new Descriptor(keyspaceDir, keyspaceName, cfname, generation, false).filenameFor("Data.db")); + File datafile = new File(new Descriptor(keyspaceDir, keyspaceName, cfname, generation, Descriptor.Type.FINAL).filenameFor("Data.db")); if (!datafile.createNewFile()) throw new IOException("unable to create file " + datafile); datafile.deleteOnExit(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java index da0e31a..7751a51 100644 --- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.io.sstable.metadata; -import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -28,8 +27,8 @@ import java.util.Set; import com.google.common.collect.Sets; import org.junit.Test; -import org.apache.cassandra.db.composites.SimpleDenseCellNameType; import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.composites.SimpleDenseCellNameType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.io.sstable.Component; @@ -76,7 +75,7 @@ public class MetadataSerializerTest serializer.serialize(originalMetadata, out); } - Descriptor desc = new Descriptor(Descriptor.Version.CURRENT, statsFile.getParentFile(), "", "", 0, false); + Descriptor desc = new Descriptor(Descriptor.Version.CURRENT, statsFile.getParentFile(), "", "", 0, Descriptor.Type.FINAL); try (RandomAccessReader in = RandomAccessReader.open(statsFile)) { Map<MetadataType, MetadataComponent> deserialized = serializer.deserialize(desc, in, EnumSet.allOf(MetadataType.class)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/util/DataOutputTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java index 2a8c7a9..fb45dd3 100644 --- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java +++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java @@ -132,7 +132,7 @@ public class DataOutputTest public void testSequentialWriter() throws IOException { File file = FileUtils.createTempFile("dataoutput", "test"); - final SequentialWriter writer = new SequentialWriter(file, 32, true); + final SequentialWriter writer = new SequentialWriter(file, 32); DataOutputStreamAndChannel write = new DataOutputStreamAndChannel(writer, writer); DataInput canon = testWrite(write); write.flush(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java index 8d9480b..dbc1ec2 100644 --- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java @@ -63,7 +63,7 @@ public class CompressedInputStreamTest Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath()); MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)); CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP); - CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), false, param, collector); + CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector); Map<Long, Long> index = new HashMap<Long, Long>(); for (long l = 0L; l < 1000; l++) {
