Updated Branches: refs/heads/trunk 9f867ea4c -> 2bfea99dd
Revert "merge from 0.1" This reverts commit 9f867ea4c20b40e2bcd07fb43dc888bd3601474a, reversing changes made to 681e2dea7679e0008cf149afc2b01f4b150d006c. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2bfea99d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2bfea99d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2bfea99d Branch: refs/heads/trunk Commit: 2bfea99ddb3451d5b0710cd0462980f56e776c70 Parents: 9f867ea Author: Jonathan Ellis <[email protected]> Authored: Wed Jun 27 11:14:09 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Wed Jun 27 11:16:31 2012 -0500 ---------------------------------------------------------------------- .../apache/cassandra/cache/AutoSavingCache.java | 144 ++++++++------- .../org/apache/cassandra/db/ColumnFamilyStore.java | 128 ++++++-------- .../unit/org/apache/cassandra/db/RowCacheTest.java | 2 +- 3 files changed, 134 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bfea99d/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 7eed2a0..41b8f5d 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,14 +7,13 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.cassandra.cache; @@ -35,11 +34,12 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.compaction.CompactionInfo; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.OperationType; -import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.SequentialWriter; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.WrappedRunnable; import org.apache.cassandra.utils.Pair; @@ -53,15 +53,19 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K protected volatile ScheduledFuture<?> saveTask; protected final CacheService.CacheType cacheType; - public AutoSavingCache(ICache<K, V> cache, CacheService.CacheType cacheType) + private CacheSerializer<K, V> cacheLoader; + private static final String CURRENT_VERSION = "b"; + + public AutoSavingCache(ICache<K, V> cache, CacheService.CacheType cacheType, CacheSerializer<K, V> cacheloader) { super(cache); this.cacheType = cacheType; + this.cacheLoader = cacheloader; } - public File getCachePath(String ksName, String cfName) + public File getCachePath(String ksName, String cfName, String version) { - return DatabaseDescriptor.getSerializedCachePath(ksName, cfName, cacheType); + return DatabaseDescriptor.getSerializedCachePath(ksName, cfName, cacheType, version); } public Writer getWriter(int keysToSave) @@ -92,41 +96,51 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K } } - public Set<DecoratedKey> readSaved(String ksName, String cfName) + public int loadSaved(ColumnFamilyStore cfs) { - File path = getCachePath(ksName, cfName); - Set<DecoratedKey> keys = new TreeSet<DecoratedKey>(); + int count = 0; + long start = System.currentTimeMillis(); + File path = getCachePath(cfs.table.name, cfs.columnFamily, null); if (path.exists()) { DataInputStream in = null; try { - long start = System.currentTimeMillis(); + logger.info(String.format("reading saved cache %s", path)); + in = new DataInputStream(new BufferedInputStream(new FileInputStream(path))); + Set<ByteBuffer> keys = new HashSet<ByteBuffer>(); + while (in.available() > 0) + { + keys.add(ByteBufferUtil.readWithLength(in)); + count++; + } + cacheLoader.load(keys, cfs); + } + catch (Exception e) + { + logger.warn(String.format("error reading saved cache %s, keys loaded so far: %d", path.getAbsolutePath(), count), e); + return count; + } + finally + { + FileUtils.closeQuietly(in); + } + } + path = getCachePath(cfs.table.name, cfs.columnFamily, CURRENT_VERSION); + if (path.exists()) + { + DataInputStream in = null; + try + { logger.info(String.format("reading saved cache %s", path)); in = new DataInputStream(new BufferedInputStream(new FileInputStream(path))); while (in.available() > 0) { - int size = in.readInt(); - byte[] bytes = new byte[size]; - in.readFully(bytes); - ByteBuffer buffer = ByteBuffer.wrap(bytes); - DecoratedKey key; - try - { - key = StorageService.getPartitioner().decorateKey(buffer); - } - catch (Exception e) - { - logger.info(String.format("unable to read entry #%s from saved cache %s; skipping remaining entries", - keys.size(), path.getAbsolutePath()), e); - break; - } - keys.add(key); + Pair<K, V> entry = cacheLoader.deserialize(in, cfs); + put(entry.left, entry.right); + count++; } - if (logger.isDebugEnabled()) - logger.debug(String.format("completed reading (%d ms; %d keys) saved cache %s", - System.currentTimeMillis() - start, keys.size(), path)); } catch (Exception e) { @@ -137,7 +151,10 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K FileUtils.closeQuietly(in); } } - return keys; + if (logger.isDebugEnabled()) + logger.debug(String.format("completed reading (%d ms; %d keys) saved cache %s", + System.currentTimeMillis() - start, count, path)); + return count; } public Future<?> submitWrite(int keysToSave) @@ -158,22 +175,11 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K } } - public int estimateSizeToSave(Set<K> keys) - { - int bytes = 0; - - for (K key : keys) - bytes += key.serializedSize(); - - return bytes; - } - public class Writer extends CompactionInfo.Holder { private final Set<K> keys; private final CompactionInfo info; - private final long estimatedTotalBytes; - private long bytesWritten; + private long keysWritten; protected Writer(int keysToSave) { @@ -182,9 +188,6 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K else keys = hotKeySet(keysToSave); - // an approximation -- the keyset can change while saving - estimatedTotalBytes = estimateSizeToSave(keys); - OperationType type; if (cacheType == CacheService.CacheType.KEY_CACHE) type = OperationType.KEY_CACHE_SAVE; @@ -196,15 +199,14 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K info = new CompactionInfo(new CFMetaData("system", cacheType.toString(), null, null, null), type, 0, - estimatedTotalBytes); + keys.size(), + "keys"); } public CompactionInfo getCompactionInfo() { - long bytesWritten = this.bytesWritten; - // keyset can change in size, thus totalBytes can too - return info.forProgress(bytesWritten, - Math.max(bytesWritten, estimatedTotalBytes)); + // keyset can change in size, thus total can too + return info.forProgress(keysWritten, Math.max(keysWritten, keys.size())); } public void saveCache() throws IOException @@ -212,7 +214,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K logger.debug("Deleting old {} files.", cacheType); deleteOldCacheFiles(); - if (keys.size() == 0 || estimatedTotalBytes == 0) + if (keys.size() == 0 || keys.size() == 0) { logger.debug("Skipping {} save, cache is empty.", cacheType); return; @@ -224,7 +226,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K try { - for (CacheKey key : keys) + for (K key : keys) { Pair<String, String> path = key.getPathInfo(); SequentialWriter writer = writers.get(path); @@ -234,9 +236,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K writer = tempCacheFile(path); writers.put(path, writer); } - - key.write(writer.stream); - bytesWritten += key.serializedSize(); + cacheLoader.serialize(key, writer.stream); + keysWritten++; } } finally @@ -251,7 +252,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K SequentialWriter writer = info.getValue(); File tmpFile = new File(writer.getPath()); - File cacheFile = getCachePath(path.left, path.right); + File cacheFile = getCachePath(path.left, path.right, CURRENT_VERSION); cacheFile.delete(); // ignore error if it didn't exist if (!tmpFile.renameTo(cacheFile)) @@ -263,13 +264,12 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K private SequentialWriter tempCacheFile(Pair<String, String> pathInfo) throws IOException { - File path = getCachePath(pathInfo.left, pathInfo.right); + File path = getCachePath(pathInfo.left, pathInfo.right, CURRENT_VERSION); File tmpFile = File.createTempFile(path.getName(), null, path.getParentFile()); return SequentialWriter.open(tmpFile, true); } - private void deleteOldCacheFiles() { File savedCachesDir = new File(DatabaseDescriptor.getSavedCachesLocation()); @@ -283,8 +283,24 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K if (!file.delete()) logger.warn("Failed to delete {}", file.getAbsolutePath()); } + + if (file.isFile() && file.getName().endsWith(CURRENT_VERSION + ".db")) + { + if (!file.delete()) + logger.warn("Failed to delete {}", file.getAbsolutePath()); + } } } } } + + public interface CacheSerializer<K extends CacheKey, V> + { + void serialize(K key, DataOutput out) throws IOException; + + Pair<K, V> deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException; + + @Deprecated + void load(Set<ByteBuffer> buffer, ColumnFamilyStore cfs); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bfea99d/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 582c097..dc2459b 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.db; import java.io.File; @@ -35,7 +34,6 @@ import com.google.common.util.concurrent.Futures; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.cache.IRowCacheEntry; import org.apache.cassandra.cache.RowCacheKey; import org.apache.cassandra.cache.RowCacheSentinel; @@ -43,7 +41,7 @@ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.*; -import org.apache.cassandra.db.columniterator.IColumnIterator; +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; @@ -66,19 +64,17 @@ import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.IndexExpression; import org.apache.cassandra.utils.*; -import org.apache.cassandra.utils.IntervalTree.Interval; -import org.apache.cassandra.utils.IntervalTree.IntervalTree; import org.cliffc.high_scale_lib.NonBlockingHashMap; import static org.apache.cassandra.config.CFMetaData.Caching; public class ColumnFamilyStore implements ColumnFamilyStoreMBean { - private static Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); + private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); /* * maybeSwitchMemtable puts Memtable.getSortedContents on the writer executor. When the write is complete, - * we turn the writer into an SSTableReader and add it to ssTables_ where it is available for reads. + * we turn the writer into an SSTableReader and add it to ssTables where it is available for reads. * * There are two other things that maybeSwitchMemtable does. * First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete @@ -117,12 +113,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private volatile int memtableSwitchCount = 0; /* This is used to generate the next index for a SSTable */ - private AtomicInteger fileIndexGenerator = new AtomicInteger(0); + private final AtomicInteger fileIndexGenerator = new AtomicInteger(0); public final SecondaryIndexManager indexManager; - private LatencyTracker readStats = new LatencyTracker(); - private LatencyTracker writeStats = new LatencyTracker(); + private final LatencyTracker readStats = new LatencyTracker(); + private final LatencyTracker writeStats = new LatencyTracker(); // counts of sstables accessed by reads private final EstimatedHistogram recentSSTablesPerRead = new EstimatedHistogram(35); @@ -226,12 +222,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // scan for sstables corresponding to this cf and load them data = new DataTracker(this); - Set<DecoratedKey> savedKeys = caching == Caching.NONE || caching == Caching.ROWS_ONLY - ? Collections.<DecoratedKey>emptySet() - : CacheService.instance.keyCache.readSaved(table.name, columnFamily); - Directories.SSTableLister sstables = directories.sstableLister().skipCompacted(true).skipTemporary(true); - data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), savedKeys, data, metadata, this.partitioner)); + data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), data, metadata, this.partitioner)); + if (caching == Caching.ALL || caching == Caching.KEYS_ONLY) + CacheService.instance.keyCache.loadSaved(this); // compaction strategy should be created after the CFS has been prepared this.compactionStrategy = metadata.createCompactionStrategyInstance(this); @@ -323,7 +317,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean Descriptor desc = entry.getKey(); generations.add(desc.generation); if (!desc.isCompatible()) - throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s", Descriptor.CURRENT_VERSION, desc)); + throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s", Descriptor.Version.CURRENT, desc)); } Collections.sort(generations); int value = (generations.size() > 0) ? (generations.get(generations.size() - 1)) : 0; @@ -408,19 +402,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean long start = System.currentTimeMillis(); - AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = CacheService.instance.rowCache; - - // results are sorted on read (via treeset) because there are few reads and many writes and reads only happen at startup - int cachedRowsRead = 0; - for (DecoratedKey key : rowCache.readSaved(table.name, columnFamily)) - { - ColumnFamily data = getTopLevelColumns(QueryFilter.getIdentityFilter(key, new QueryPath(columnFamily)), - Integer.MIN_VALUE, - true); - CacheService.instance.rowCache.put(new RowCacheKey(metadata.cfId, key), data); - cachedRowsRead++; - } - + int cachedRowsRead = CacheService.instance.rowCache.loadSaved(this); if (cachedRowsRead > 0) logger.info(String.format("completed loading (%d ms; %d keys) row cache for %s.%s", System.currentTimeMillis() - start, @@ -466,7 +448,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (!descriptor.isCompatible()) throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s", - Descriptor.CURRENT_VERSION, + Descriptor.Version.CURRENT, descriptor)); Descriptor newDescriptor = new Descriptor(descriptor.version, @@ -481,7 +463,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean SSTableReader reader; try { - reader = SSTableReader.open(newDescriptor, entry.getValue(), Collections.<DecoratedKey>emptySet(), data, metadata, partitioner); + reader = SSTableReader.open(newDescriptor, entry.getValue(), data, metadata, partitioner); } catch (IOException e) { @@ -558,7 +540,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * When the sstable object is closed, it will be renamed to a non-temporary * format, so incomplete sstables can be recognized and removed on startup. */ - public String getFlushPath(long estimatedSize, String version) + public String getFlushPath(long estimatedSize, Descriptor.Version version) { File location = directories.getDirectoryForNewSSTables(estimatedSize); if (location == null) @@ -566,7 +548,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return getTempSSTablePath(location, version); } - public String getTempSSTablePath(File directory, String version) + public String getTempSSTablePath(File directory, Descriptor.Version version) { Descriptor desc = new Descriptor(version, directory, @@ -579,7 +561,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public String getTempSSTablePath(File directory) { - return getTempSSTablePath(directory, Descriptor.CURRENT_VERSION); + return getTempSSTablePath(directory, Descriptor.Version.CURRENT); } /** flush the given memtable and swap in a new one for its CFS, if it hasn't been frozen already. threadsafe. */ @@ -759,11 +741,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public static ColumnFamily removeDeletedCF(ColumnFamily cf, int gcBefore) { - if (cf.getColumnCount() == 0 && (!cf.isMarkedForDelete() || cf.getLocalDeletionTime() < gcBefore)) - return null; - cf.maybeResetDeletionTimes(gcBefore); - return cf; + return cf.getColumnCount() == 0 && !cf.isMarkedForDelete() ? null : cf; } /* @@ -799,10 +778,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean IColumn c = iter.next(); ByteBuffer cname = c.name(); // remove columns if - // (a) the column itself is tombstoned or - // (b) the CF is tombstoned and the column is not newer than it - if (c.getLocalDeletionTime() < gcBefore - || c.timestamp() <= cf.getMarkedForDeleteAt()) + // (a) the column itself is gcable or + // (b) the column is shadowed by a CF tombstone + if (c.getLocalDeletionTime() < gcBefore || cf.deletionInfo().isDeleted(c)) { iter.remove(); } @@ -818,28 +796,26 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean while (iter.hasNext()) { SuperColumn c = (SuperColumn)iter.next(); - long minTimestamp = Math.max(c.getMarkedForDeleteAt(), cf.getMarkedForDeleteAt()); Iterator<IColumn> subIter = c.getSubColumns().iterator(); while (subIter.hasNext()) { IColumn subColumn = subIter.next(); // remove subcolumns if - // (a) the subcolumn itself is tombstoned or - // (b) the supercolumn is tombstoned and the subcolumn is not newer than it - if (subColumn.timestamp() <= minTimestamp - || subColumn.getLocalDeletionTime() < gcBefore) + // (a) the subcolumn itself is gcable or + // (b) the supercolumn is shadowed by the CF and the column is not newer + // (b) the subcolumn is shadowed by the supercolumn + if (subColumn.getLocalDeletionTime() < gcBefore + || cf.deletionInfo().isDeleted(c.name(), subColumn.timestamp()) + || c.deletionInfo().isDeleted(subColumn)) { subIter.remove(); } } - if (c.getSubColumns().isEmpty() && (!c.isMarkedForDelete() || c.getLocalDeletionTime() < gcBefore)) + c.maybeResetDeletionTimes(gcBefore); + if (c.getSubColumns().isEmpty() && !c.isMarkedForDelete()) { iter.remove(); } - else - { - c.maybeResetDeletionTimes(gcBefore); - } } } @@ -855,12 +831,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (sstables.isEmpty()) return ImmutableSet.of(); - IntervalTree<SSTableReader> tree = data.getView().intervalTree; + DataTracker.SSTableIntervalTree tree = data.getView().intervalTree; Set<SSTableReader> results = null; for (SSTableReader sstable : sstables) { - Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(new Interval<SSTableReader>(sstable.first, sstable.last))); + Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last))); assert overlaps.contains(sstable); results = results == null ? overlaps : Sets.union(results, overlaps); } @@ -1131,7 +1107,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * @return the entire row for filter.key, if present in the cache (or we can cache it), or just the column * specified by filter otherwise */ - private ColumnFamily getThroughCache(Integer cfId, QueryFilter filter) + private ColumnFamily getThroughCache(UUID cfId, QueryFilter filter) { assert isRowCacheEnabled() : String.format("Row cache is not enabled on column family [" + getColumnFamilyName() + "]"); @@ -1190,7 +1166,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return cf.isSuper() ? removeDeleted(cf, gcBefore) : removeDeletedCF(cf, gcBefore); } - Integer cfId = Schema.instance.getId(table.name, this.columnFamily); + UUID cfId = Schema.instance.getId(table.name, this.columnFamily); if (cfId == null) return null; // secondary index @@ -1214,8 +1190,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter, int gcBefore) { ColumnFamily cf = cached.cloneMeShallow(ArrayBackedSortedColumns.factory(), filter.filter.isReversed()); - IColumnIterator ci = filter.getMemtableColumnIterator(cached, null); - filter.collateColumns(cf, Collections.singletonList(ci), gcBefore); + OnDiskAtomIterator ci = filter.getMemtableColumnIterator(cached, null); + filter.collateOnDiskAtom(cf, Collections.singletonList(ci), gcBefore); // TODO this is necessary because when we collate supercolumns together, we don't check // their subcolumns for relevance, so we need to do a second prune post facto here. return cf.isSuper() ? removeDeleted(cf, gcBefore) : removeDeletedCF(cf, gcBefore); @@ -1265,7 +1241,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean while (true) { view = data.getView(); - sstables = view.intervalTree.search(new Interval(key, key)); + sstables = view.intervalTree.search(key); if (SSTableReader.acquireReferences(sstables)) break; // retry w/ new view @@ -1285,9 +1261,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { view = data.getView(); // startAt == minimum is ok, but stopAt == minimum is confusing because all IntervalTree deals with - // is Comparable, so it won't know to special-case that. - Comparable stopInTree = stopAt.isMinimum() ? view.intervalTree.max() : stopAt; - sstables = view.intervalTree.search(new Interval(startWith, stopInTree)); + // is Comparable, so it won't know to special-case that. However max() should not be call if the + // intervalTree is empty sochecking that first + // + if (view.intervalTree.isEmpty()) + { + sstables = Collections.<SSTableReader>emptyList(); + break; + } + + RowPosition stopInTree = stopAt.isMinimum() ? view.intervalTree.max() : stopAt; + sstables = view.intervalTree.search(Interval.<RowPosition, SSTableReader>create(startWith, stopInTree)); if (SSTableReader.acquireReferences(sstables)) break; // retry w/ new view @@ -1305,7 +1289,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean for (SSTableReader sstr : view.sstables) { // check if the key actually exists in this sstable, without updating cache and stats - if (sstr.getPosition(dk, SSTableReader.Operator.EQ, false) > -1) + if (sstr.getPosition(dk, SSTableReader.Operator.EQ, false) != null) files.add(sstr.getFilename()); } return files; @@ -1315,7 +1299,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } } - private ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore, boolean forCache) + public ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore, boolean forCache) { CollationController controller = new CollationController(this, forCache, filter, gcBefore); ColumnFamily columns = controller.getTopLevelColumns(); @@ -1607,7 +1591,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public void invalidateCachedRow(DecoratedKey key) { - Integer cfId = Schema.instance.getId(table.name, this.columnFamily); + UUID cfId = Schema.instance.getId(table.name, this.columnFamily); if (cfId == null) return; // secondary index @@ -1637,10 +1621,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return cfses; } - public Iterable<DecoratedKey<?>> keySamples(Range<Token> range) + public Iterable<DecoratedKey> keySamples(Range<Token> range) { Collection<SSTableReader> sstables = getSSTables(); - Iterable<DecoratedKey<?>>[] samples = new Iterable[sstables.size()]; + Iterable<DecoratedKey>[] samples = new Iterable[sstables.size()]; int i = 0; for (SSTableReader sstable: sstables) { @@ -1903,12 +1887,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return partitioner instanceof LocalPartitioner; } - private String getParentColumnfamily() - { - assert isIndex(); - return columnFamily.split("\\.")[0]; - } - private ByteBuffer intern(ByteBuffer name) { ByteBuffer internedName = internedNames.get(name); @@ -1941,7 +1919,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public SSTableWriter createFlushWriter(long estimatedRows, long estimatedSize, ReplayPosition context) throws IOException { SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(context); - return new SSTableWriter(getFlushPath(estimatedSize, Descriptor.CURRENT_VERSION), + return new SSTableWriter(getFlushPath(estimatedSize, Descriptor.Version.CURRENT), estimatedRows, metadata, partitioner, http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bfea99d/test/unit/org/apache/cassandra/db/RowCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java index e6b4578..99b8cbc 100644 --- a/test/unit/org/apache/cassandra/db/RowCacheTest.java +++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java @@ -153,6 +153,6 @@ public class RowCacheTest extends SchemaLoader // empty the cache again to make sure values came from disk CacheService.instance.invalidateRowCache(); assert CacheService.instance.rowCache.size() == 0; - assert CacheService.instance.rowCache.readSaved(KEYSPACE, COLUMN_FAMILY).size() == (keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave); + assert CacheService.instance.rowCache.loadSaved(store) == (keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave); } }
