Author: jbellis
Date: Thu Mar 4 20:49:45 2010
New Revision: 919171
URL: http://svn.apache.org/viewvc?rev=919171&view=rev
Log:
extract SSTableReader as superclasses; subclass is RowIndexedReader
patch by Stu Hood; reviewed by jbellis for CASSANDRA-777
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java
- copied, changed from r919170,
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedScanner.java
- copied, changed from r919170,
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java.orig
- copied, changed from r919170,
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Thu Mar 4 20:49:45 2010
@@ -1084,16 +1084,16 @@
return Iterables.concat(stores);
}
- public Iterable<SSTable.KeyPosition> allIndexPositions()
+ public Iterable<DecoratedKey> allKeySamples()
{
Collection<SSTableReader> sstables = getSSTables();
- Iterable<SSTable.KeyPosition>[] positions = new
Iterable[sstables.size()];
+ Iterable<DecoratedKey>[] samples = new Iterable[sstables.size()];
int i = 0;
for (SSTableReader sstable: sstables)
{
- positions[i++] = sstable.getIndexPositions();
+ samples[i++] = sstable.getKeySamples();
}
- return Iterables.concat(positions);
+ return Iterables.concat(samples);
}
/**
Copied:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java
(from r919170,
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java)
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java&r1=919170&r2=919171&rev=919171&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java
Thu Mar 4 20:49:45 2010
@@ -28,10 +28,9 @@
import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
-
import org.apache.cassandra.cache.InstrumentedCache;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.service.StorageService;
@@ -45,110 +44,32 @@
import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
/**
- * SSTableReaders are open()ed by Table.onStart; after that they are created
by SSTableWriter.renameAndOpen.
- * Do not re-call open() on existing SSTable files; use the references kept by
ColumnFamilyStore post-start instead.
+ * Pre 0.7 SSTable implementation, using per row indexes.
*/
-public class SSTableReader extends SSTable implements Comparable<SSTableReader>
+class RowIndexedReader extends SSTableReader
{
- private static final Logger logger = Logger.getLogger(SSTableReader.class);
+ private static final Logger logger =
Logger.getLogger(RowIndexedReader.class);
- // `finalizers` is required to keep the PhantomReferences alive after the
enclosing SSTR is itself
- // unreferenced. otherwise they will never get enqueued.
- private static final Set<Reference<SSTableReader>> finalizers = new
HashSet<Reference<SSTableReader>>();
- private static final ReferenceQueue<SSTableReader> finalizerQueue = new
ReferenceQueue<SSTableReader>()
- {{
- Runnable runnable = new Runnable()
- {
- public void run()
- {
- while (true)
- {
- SSTableDeletingReference r = null;
- try
- {
- r = (SSTableDeletingReference) finalizerQueue.remove();
- finalizers.remove(r);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- try
- {
- r.cleanup();
- }
- catch (IOException e)
- {
- logger.error("Error deleting " + r.path, e);
- }
- }
- }
- };
- new Thread(runnable, "SSTABLE-DELETER").start();
- }};
private static final long BUFFER_SIZE = Integer.MAX_VALUE;
- public static int indexInterval()
- {
- return INDEX_INTERVAL;
- }
-
- public static long getApproximateKeyCount(Iterable<SSTableReader> sstables)
- {
- long count = 0;
-
- for (SSTableReader sstable : sstables)
- {
- int indexKeyCount = sstable.getIndexPositions().size();
- count = count + (indexKeyCount + 1) * INDEX_INTERVAL;
- if (logger.isDebugEnabled())
- logger.debug("index size for bloom filter calc for file : " +
sstable.getFilename() + " : " + count);
- }
-
- return count;
- }
-
- public static SSTableReader open(Descriptor desc) throws IOException
- {
- return open(desc.filenameFor(COMPONENT_DATA));
- }
-
- public static SSTableReader open(String dataFileName) throws IOException
- {
- return open(dataFileName, StorageService.getPartitioner());
- }
-
- public static SSTableReader open(String dataFileName, IPartitioner
partitioner) throws IOException
- {
- assert partitioner != null;
-
- long start = System.currentTimeMillis();
- SSTableReader sstable = new SSTableReader(dataFileName, partitioner);
- logger.info("Sampling index for " + dataFileName);
- sstable.loadIndexFile();
- sstable.loadBloomFilter();
-
- if (logger.isDebugEnabled())
- logger.debug("INDEX LOAD TIME for " + dataFileName + ": " +
(System.currentTimeMillis() - start) + " ms.");
-
- return sstable;
- }
-
- private volatile SSTableDeletingReference phantomReference;
// jvm can only map up to 2GB at a time, so we split index/data into
segments of that size when using mmap i/o
private final MappedByteBuffer[] indexBuffers;
private final MappedByteBuffer[] buffers;
- private InstrumentedCache<Pair<Descriptor,DecoratedKey>,PositionSize>
keyCache;
+ private InstrumentedCache<Pair<Descriptor,DecoratedKey>, PositionSize>
keyCache;
- SSTableReader(String filename,
- IPartitioner partitioner,
- List<KeyPosition> indexPositions, Map<KeyPosition,
PositionSize> spannedIndexDataPositions,
- BloomFilter bloomFilter)
- throws IOException
+ RowIndexedReader(Descriptor desc,
+ IPartitioner partitioner,
+ List<KeyPosition> indexPositions,
+ Map<KeyPosition, PositionSize> spannedIndexDataPositions,
+ BloomFilter bloomFilter)
+ throws IOException
{
- super(filename, partitioner);
+ super(desc, partitioner);
if (DatabaseDescriptor.getIndexAccessMode() ==
DatabaseDescriptor.DiskAccessMode.mmap)
{
@@ -190,48 +111,34 @@
this.bf = bloomFilter;
}
- public void setTrackedBy(SSTableTracker tracker)
+ RowIndexedReader(Descriptor desc, IPartitioner partitioner) throws
IOException
{
- phantomReference = new SSTableDeletingReference(tracker, this,
finalizerQueue);
- finalizers.add(phantomReference);
- keyCache = tracker.getKeyCache();
+ this(desc, partitioner, null, null, null);
}
- private static MappedByteBuffer mmap(String filename, long start, int
size) throws IOException
+ public static RowIndexedReader open(Descriptor desc, IPartitioner
partitioner) throws IOException
{
- RandomAccessFile raf;
- try
- {
- raf = new RandomAccessFile(filename, "r");
- }
- catch (FileNotFoundException e)
- {
- throw new IOError(e);
- }
-
- try
- {
- return raf.getChannel().map(FileChannel.MapMode.READ_ONLY, start,
size);
- }
- finally
- {
- raf.close();
- }
- }
+ RowIndexedReader sstable = new RowIndexedReader(desc, partitioner);
+ sstable.loadIndexFile();
+ sstable.loadBloomFilter();
- private SSTableReader(String filename, IPartitioner partitioner) throws
IOException
- {
- this(filename, partitioner, null, null, null);
+ return sstable;
}
- public List<KeyPosition> getIndexPositions()
+ public long estimatedKeys()
{
- return indexPositions;
+ return (indexPositions.size() + 1) * INDEX_INTERVAL;
}
- public long estimatedKeys()
+ public Collection<DecoratedKey> getKeySamples()
{
- return indexPositions.size() * INDEX_INTERVAL;
+ return Collections2.transform(indexPositions,
+ new Function<KeyPosition,DecoratedKey>(){
+ public DecoratedKey
apply(KeyPosition kp)
+ {
+ return kp.key;
+ }
+ });
}
void loadBloomFilter() throws IOException
@@ -296,6 +203,13 @@
}
}
+ @Override
+ public void setTrackedBy(SSTableTracker tracker)
+ {
+ super.setTrackedBy(tracker);
+ keyCache = tracker.getKeyCache();
+ }
+
/** get the position in the index file to start scanning to find the given
key (at most indexInterval keys away) */
private KeyPosition getIndexScanPosition(DecoratedKey decoratedKey)
{
@@ -455,31 +369,14 @@
return desc.generation - o.desc.generation;
}
- public void markCompacted() throws IOException
- {
- if (logger.isDebugEnabled())
- logger.debug("Marking " + getFilename() + " compacted");
- if (!new File(compactedFilename()).createNewFile())
- {
- throw new IOException("Unable to create compaction marker");
- }
- phantomReference.deleteOnCleanup();
- }
-
- /** obviously only for testing */
- public void forceBloomFilterFailures()
+ public void forceFilterFailures()
{
bf = BloomFilter.alwaysMatchingBloomFilter();
}
- public IPartitioner getPartitioner()
- {
- return partitioner;
- }
-
public SSTableScanner getScanner(int bufferSize) throws IOException
{
- return new SSTableScanner(this, bufferSize);
+ return new RowIndexedScanner(this, bufferSize);
}
public FileDataInput getFileDataInput(DecoratedKey decoratedKey, int
bufferSize) throws IOException
@@ -502,70 +399,8 @@
return (int) (position / BUFFER_SIZE);
}
- public AbstractType getColumnComparator()
- {
- return DatabaseDescriptor.getComparator(getTableName(),
getColumnFamilyName());
- }
-
- public ColumnFamily makeColumnFamily()
- {
- return ColumnFamily.create(getTableName(), getColumnFamilyName());
- }
-
- public ICompactSerializer2<IColumn> getColumnSerializer()
- {
- return DatabaseDescriptor.getColumnFamilyType(getTableName(),
getColumnFamilyName()).equals("Standard")
- ? Column.serializer()
- : SuperColumn.serializer(getColumnComparator());
- }
-}
-
-class FileSSTableMap
-{
- private final Map<String, SSTableReader> map = new
NonBlockingHashMap<String, SSTableReader>();
-
- public SSTableReader get(String filename)
- {
- try
- {
- return map.get(new File(filename).getCanonicalPath());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public SSTableReader put(String filename, SSTableReader value)
- {
- try
- {
- return map.put(new File(filename).getCanonicalPath(), value);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public Collection<SSTableReader> values()
- {
- return map.values();
- }
-
- public void clear()
- {
- map.clear();
- }
-
- public void remove(String filename) throws IOException
- {
- map.remove(new File(filename).getCanonicalPath());
- }
-
- @Override
- public String toString()
+ public InstrumentedCache getKeyCache()
{
- return "FileSSTableMap {" + StringUtils.join(map.keySet(), ", ") + "}";
+ return keyCache;
}
}
Copied:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedScanner.java
(from r919170,
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java)
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedScanner.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedScanner.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java&r1=919170&r2=919171&rev=919171&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedScanner.java
Thu Mar 4 20:49:45 2010
@@ -33,9 +33,9 @@
import org.apache.log4j.Logger;
-public class SSTableScanner implements Iterator<IteratingRow>, Closeable
+public class RowIndexedScanner extends SSTableScanner
{
- private static Logger logger = Logger.getLogger(SSTableScanner.class);
+ private static Logger logger = Logger.getLogger(RowIndexedScanner.class);
private final BufferedRandomAccessFile file;
private final SSTableReader sstable;
@@ -46,7 +46,7 @@
/**
* @param sstable SSTable to scan.
*/
- SSTableScanner(SSTableReader sstable, int bufferSize) throws IOException
+ RowIndexedScanner(SSTableReader sstable, int bufferSize) throws IOException
{
this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r",
bufferSize);
this.sstable = sstable;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
Thu Mar 4 20:49:45 2010
@@ -76,6 +76,17 @@
this.partitioner = partitioner;
}
+ protected SSTable(Descriptor desc, IPartitioner partitioner)
+ {
+ this.desc = desc;
+ this.partitioner = partitioner;
+ }
+
+ public IPartitioner getPartitioner()
+ {
+ return partitioner;
+ }
+
public Descriptor getDescriptor()
{
return desc;
@@ -335,6 +346,14 @@
}
/**
+ * @return A clone of this descriptor with the given 'temporary'
status.
+ */
+ public Descriptor asTemporary(boolean temporary)
+ {
+ return new Descriptor(version, directory, ksname, cfname,
generation, temporary);
+ }
+
+ /**
* @return True if the given version string is not empty, and
* contains all lowercase letters, as defined by java.lang.Character.
*/
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
Thu Mar 4 20:49:45 2010
@@ -28,8 +28,6 @@
import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
-
import org.apache.cassandra.cache.InstrumentedCache;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.utils.BloomFilter;
@@ -49,7 +47,7 @@
* SSTableReaders are open()ed by Table.onStart; after that they are created
by SSTableWriter.renameAndOpen.
* Do not re-call open() on existing SSTable files; use the references kept by
ColumnFamilyStore post-start instead.
*/
-public class SSTableReader extends SSTable implements Comparable<SSTableReader>
+public abstract class SSTableReader extends SSTable implements
Comparable<SSTableReader>
{
private static final Logger logger = Logger.getLogger(SSTableReader.class);
@@ -87,7 +85,6 @@
};
new Thread(runnable, "SSTABLE-DELETER").start();
}};
- private static final long BUFFER_SIZE = Integer.MAX_VALUE;
public static int indexInterval()
{
@@ -100,7 +97,7 @@
for (SSTableReader sstable : sstables)
{
- int indexKeyCount = sstable.getIndexPositions().size();
+ int indexKeyCount = sstable.getKeySamples().size();
count = count + (indexKeyCount + 1) * INDEX_INTERVAL;
if (logger.isDebugEnabled())
logger.debug("index size for bloom filter calc for file : " +
sstable.getFilename() + " : " + count);
@@ -109,95 +106,48 @@
return count;
}
- public static SSTableReader open(Descriptor desc) throws IOException
+ public static SSTableReader open(String dataFileName) throws IOException
{
- return open(desc.filenameFor(COMPONENT_DATA));
+ return open(Descriptor.fromFilename(dataFileName));
}
- public static SSTableReader open(String dataFileName) throws IOException
+ public static SSTableReader open(Descriptor desc) throws IOException
{
- return open(dataFileName, StorageService.getPartitioner());
+ return open(desc, StorageService.getPartitioner());
}
public static SSTableReader open(String dataFileName, IPartitioner
partitioner) throws IOException
{
- assert partitioner != null;
-
- long start = System.currentTimeMillis();
- SSTableReader sstable = new SSTableReader(dataFileName, partitioner);
- logger.info("Sampling index for " + dataFileName);
- sstable.loadIndexFile();
- sstable.loadBloomFilter();
-
- if (logger.isDebugEnabled())
- logger.debug("INDEX LOAD TIME for " + dataFileName + ": " +
(System.currentTimeMillis() - start) + " ms.");
-
- return sstable;
+ return open(Descriptor.fromFilename(dataFileName), partitioner);
}
- private volatile SSTableDeletingReference phantomReference;
- // jvm can only map up to 2GB at a time, so we split index/data into
segments of that size when using mmap i/o
- private final MappedByteBuffer[] indexBuffers;
- private final MappedByteBuffer[] buffers;
-
- private InstrumentedCache<Pair<Descriptor,DecoratedKey>,PositionSize>
keyCache;
-
- SSTableReader(String filename,
- IPartitioner partitioner,
- List<KeyPosition> indexPositions, Map<KeyPosition,
PositionSize> spannedIndexDataPositions,
- BloomFilter bloomFilter)
- throws IOException
+ public static SSTableReader open(Descriptor descriptor, IPartitioner
partitioner) throws IOException
{
- super(filename, partitioner);
+ assert partitioner != null;
- if (DatabaseDescriptor.getIndexAccessMode() ==
DatabaseDescriptor.DiskAccessMode.mmap)
- {
- long indexLength = new File(indexFilename()).length();
- int bufferCount = 1 + (int) (indexLength / BUFFER_SIZE);
- indexBuffers = new MappedByteBuffer[bufferCount];
- long remaining = indexLength;
- for (int i = 0; i < bufferCount; i++)
- {
- indexBuffers[i] = mmap(indexFilename(), i * BUFFER_SIZE, (int)
Math.min(remaining, BUFFER_SIZE));
- remaining -= BUFFER_SIZE;
- }
- }
- else
- {
- assert DatabaseDescriptor.getIndexAccessMode() ==
DatabaseDescriptor.DiskAccessMode.standard;
- indexBuffers = null;
- }
+ long start = System.currentTimeMillis();
+ logger.info("Sampling index for " + descriptor);
- if (DatabaseDescriptor.getDiskAccessMode() ==
DatabaseDescriptor.DiskAccessMode.mmap)
+ SSTableReader sstable;
+ // FIXME: version conditional readers here
+ if (true)
{
- int bufferCount = 1 + (int) (new File(getFilename()).length() /
BUFFER_SIZE);
- buffers = new MappedByteBuffer[bufferCount];
- long remaining = length();
- for (int i = 0; i < bufferCount; i++)
- {
- buffers[i] = mmap(getFilename(), i * BUFFER_SIZE, (int)
Math.min(remaining, BUFFER_SIZE));
- remaining -= BUFFER_SIZE;
- }
- }
- else
- {
- assert DatabaseDescriptor.getDiskAccessMode() ==
DatabaseDescriptor.DiskAccessMode.standard;
- buffers = null;
+ sstable = RowIndexedReader.open(descriptor, partitioner);
}
- this.indexPositions = indexPositions;
- this.spannedIndexDataPositions = spannedIndexDataPositions;
- this.bf = bloomFilter;
+ if (logger.isDebugEnabled())
+ logger.debug("INDEX LOAD TIME for " + descriptor + ": " +
(System.currentTimeMillis() - start) + " ms.");
+
+ return sstable;
}
public void setTrackedBy(SSTableTracker tracker)
{
phantomReference = new SSTableDeletingReference(tracker, this,
finalizerQueue);
finalizers.add(phantomReference);
- keyCache = tracker.getKeyCache();
}
- private static MappedByteBuffer mmap(String filename, long start, int
size) throws IOException
+ protected static MappedByteBuffer mmap(String filename, long start, int
size) throws IOException
{
RandomAccessFile raf;
try
@@ -219,241 +169,61 @@
}
}
- private SSTableReader(String filename, IPartitioner partitioner) throws
IOException
- {
- this(filename, partitioner, null, null, null);
- }
-
- public List<KeyPosition> getIndexPositions()
- {
- return indexPositions;
- }
+ /*************************************************************************/
- public long estimatedKeys()
+ protected SSTableReader(Descriptor desc, IPartitioner partitioner)
{
- return indexPositions.size() * INDEX_INTERVAL;
+ super(desc, partitioner);
}
- void loadBloomFilter() throws IOException
- {
- DataInputStream stream = new DataInputStream(new
FileInputStream(filterFilename()));
- try
- {
- bf = BloomFilter.serializer().deserialize(stream);
- }
- finally
- {
- stream.close();
- }
- }
-
- void loadIndexFile() throws IOException
- {
- indexPositions = new ArrayList<KeyPosition>();
- // we read the positions in a BRAF so we don't have to worry about an
entry spanning a mmap boundary.
- // any entries that do, we force into the in-memory sample so key
lookup can always bsearch within
- // a single mmapped segment.
- BufferedRandomAccessFile input = new
BufferedRandomAccessFile(indexFilename(), "r");
- try
- {
- int i = 0;
- long indexSize = input.length();
- while (true)
- {
- long indexPosition = input.getFilePointer();
- if (indexPosition == indexSize)
- {
- break;
- }
- DecoratedKey decoratedKey =
partitioner.convertFromDiskFormat(input.readUTF());
- long dataPosition = input.readLong();
- long nextIndexPosition = input.getFilePointer();
- boolean spannedEntry = bufferIndex(indexPosition) !=
bufferIndex(nextIndexPosition);
- if (i++ % INDEX_INTERVAL == 0 || spannedEntry)
- {
- KeyPosition info;
- info = new KeyPosition(decoratedKey, indexPosition);
- indexPositions.add(info);
-
- if (spannedEntry)
- {
- if (spannedIndexDataPositions == null)
- {
- spannedIndexDataPositions = new
HashMap<KeyPosition, PositionSize>();
- }
- // read the next index entry to see how big the row is
corresponding to the current, mmap-segment-spanning one
- input.readUTF();
- long nextDataPosition = input.readLong();
- input.seek(nextIndexPosition);
- spannedIndexDataPositions.put(info, new
PositionSize(dataPosition, nextDataPosition - dataPosition));
- }
- }
- }
- }
- finally
- {
- input.close();
- }
- }
+ private volatile SSTableDeletingReference phantomReference;
- /** get the position in the index file to start scanning to find the given
key (at most indexInterval keys away) */
- private KeyPosition getIndexScanPosition(DecoratedKey decoratedKey)
+ void addFinalizingReference(SSTableTracker tracker)
{
- assert indexPositions != null && indexPositions.size() > 0;
- int index = Collections.binarySearch(indexPositions, new
KeyPosition(decoratedKey, -1));
- if (index < 0)
- {
- // binary search gives us the first index _greater_ than the key
searched for,
- // i.e., its insertion position
- int greaterThan = (index + 1) * -1;
- if (greaterThan == 0)
- return null;
- return indexPositions.get(greaterThan - 1);
- }
- else
- {
- return indexPositions.get(index);
- }
+ phantomReference = new SSTableDeletingReference(tracker, this,
finalizerQueue);
+ finalizers.add(phantomReference);
}
/**
- * returns the position in the data file to find the given key, or -1 if
the key is not present
+ * For testing purposes only.
*/
- public PositionSize getPosition(DecoratedKey decoratedKey) throws
IOException
- {
- // first, check bloom filter
- if (!bf.isPresent(partitioner.convertToDiskFormat(decoratedKey)))
- return null;
-
- // next, the key cache
- Pair<Descriptor, DecoratedKey> unifiedKey = new Pair<Descriptor,
DecoratedKey>(desc, decoratedKey);
- if (keyCache != null && keyCache.getCapacity() > 0)
- {
- PositionSize cachedPosition = keyCache.get(unifiedKey);
- if (cachedPosition != null)
- {
- return cachedPosition;
- }
- }
-
- // next, see if the sampled index says it's impossible for the key to
be present
- KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
- if (sampledPosition == null)
- {
- return null;
- }
+ public abstract void forceFilterFailures();
- // handle exact sampled index hit
- if (spannedIndexDataPositions != null)
- {
- PositionSize info = spannedIndexDataPositions.get(sampledPosition);
- if (info != null)
- return info;
- }
+ /**
+ * @return The key cache: for monitoring purposes.
+ */
+ public abstract InstrumentedCache getKeyCache();
- // scan the on-disk index, starting at the nearest sampled position
- long p = sampledPosition.position;
- FileDataInput input;
- if (indexBuffers == null)
- {
- input = new BufferedRandomAccessFile(indexFilename(), "r");
- ((BufferedRandomAccessFile)input).seek(p);
- }
- else
- {
- input = new MappedFileDataInput(indexBuffers[bufferIndex(p)],
indexFilename(), (int)(p % BUFFER_SIZE));
- }
- try
- {
- int i = 0;
- do
- {
- DecoratedKey indexDecoratedKey;
- try
- {
- indexDecoratedKey =
partitioner.convertFromDiskFormat(input.readUTF());
- }
- catch (EOFException e)
- {
- return null;
- }
- long position = input.readLong();
- int v = indexDecoratedKey.compareTo(decoratedKey);
- if (v == 0)
- {
- PositionSize info;
- if (!input.isEOF())
- {
- int utflen = input.readUnsignedShort();
- if (utflen != input.skipBytes(utflen))
- throw new EOFException();
- info = new PositionSize(position, input.readLong() -
position);
- }
- else
- {
- info = new PositionSize(position, length() - position);
- }
- if (keyCache != null && keyCache.getCapacity() > 0)
- keyCache.put(unifiedKey, info);
- return info;
- }
- if (v > 0)
- return null;
- } while (++i < INDEX_INTERVAL);
- }
- finally
- {
- input.close();
- }
- return null;
- }
+ /**
+ * @return An estimate of the number of keys in this SSTable.
+ */
+ public abstract long estimatedKeys();
- /** like getPosition, but if key is not found will return the location of
the first key _greater_ than the desired one, or -1 if no such key exists. */
- public long getNearestPosition(DecoratedKey decoratedKey) throws
IOException
- {
- KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
- if (sampledPosition == null)
- {
- return 0;
- }
+ /**
+ * @return Approximately 1/INDEX_INTERVALth of the keys in this SSTable.
+ */
+ public abstract Collection<DecoratedKey> getKeySamples();
- // can't use a MappedFileDataInput here, since we might cross a
segment boundary while scanning
- BufferedRandomAccessFile input = new
BufferedRandomAccessFile(indexFilename(), "r");
- input.seek(sampledPosition.position);
- try
- {
- while (true)
- {
- DecoratedKey indexDecoratedKey;
- try
- {
- indexDecoratedKey =
partitioner.convertFromDiskFormat(input.readUTF());
- }
- catch (EOFException e)
- {
- return -1;
- }
- long position = input.readLong();
- int v = indexDecoratedKey.compareTo(decoratedKey);
- if (v >= 0)
- return position;
- }
- }
- finally
- {
- input.close();
- }
- }
+ /**
+ * Returns the position in the data file to find the given key, or -1 if
the
+ * key is not present.
+ * FIXME: should not be public: use Scanner.
+ */
+ @Deprecated
+ public abstract PositionSize getPosition(DecoratedKey decoratedKey) throws
IOException;
- public long length()
- {
- return new File(getFilename()).length();
- }
+ /**
+ * Like getPosition, but if key is not found will return the location of
the
+ * first key _greater_ than the desired one, or -1 if no such key exists.
+ * FIXME: should not be public: use Scanner.
+ */
+ @Deprecated
+ public abstract long getNearestPosition(DecoratedKey decoratedKey) throws
IOException;
- public int compareTo(SSTableReader o)
- {
- return desc.generation - o.desc.generation;
- }
+ /**
+ * @return The length in bytes of the data file for this SSTable.
+ */
+ public abstract long length();
public void markCompacted() throws IOException
{
@@ -466,41 +236,17 @@
phantomReference.deleteOnCleanup();
}
- /** obviously only for testing */
- public void forceBloomFilterFailures()
- {
- bf = BloomFilter.alwaysMatchingBloomFilter();
- }
-
- public IPartitioner getPartitioner()
- {
- return partitioner;
- }
-
- public SSTableScanner getScanner(int bufferSize) throws IOException
- {
- return new SSTableScanner(this, bufferSize);
- }
-
- public FileDataInput getFileDataInput(DecoratedKey decoratedKey, int
bufferSize) throws IOException
- {
- PositionSize info = getPosition(decoratedKey);
- if (info == null)
- return null;
-
- if (buffers == null || (bufferIndex(info.position) !=
bufferIndex(info.position + info.size)))
- {
- BufferedRandomAccessFile file = new
BufferedRandomAccessFile(getFilename(), "r", bufferSize);
- file.seek(info.position);
- return file;
- }
- return new MappedFileDataInput(buffers[bufferIndex(info.position)],
getFilename(), (int) (info.position % BUFFER_SIZE));
- }
+ /**
+ * @param bufferSize Buffer size in bytes for this Scanner.
+ * @return A Scanner for seeking over the rows of the SSTable.
+ */
+ public abstract SSTableScanner getScanner(int bufferSize) throws
IOException;
- static int bufferIndex(long position)
- {
- return (int) (position / BUFFER_SIZE);
- }
+ /**
+ * FIXME: should not be public: use Scanner.
+ */
+ @Deprecated
+ public abstract FileDataInput getFileDataInput(DecoratedKey decoratedKey,
int bufferSize) throws IOException;
public AbstractType getColumnComparator()
{
@@ -519,53 +265,3 @@
: SuperColumn.serializer(getColumnComparator());
}
}
-
-class FileSSTableMap
-{
- private final Map<String, SSTableReader> map = new
NonBlockingHashMap<String, SSTableReader>();
-
- public SSTableReader get(String filename)
- {
- try
- {
- return map.get(new File(filename).getCanonicalPath());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public SSTableReader put(String filename, SSTableReader value)
- {
- try
- {
- return map.put(new File(filename).getCanonicalPath(), value);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public Collection<SSTableReader> values()
- {
- return map.values();
- }
-
- public void clear()
- {
- map.clear();
- }
-
- public void remove(String filename) throws IOException
- {
- map.remove(new File(filename).getCanonicalPath());
- }
-
- @Override
- public String toString()
- {
- return "FileSSTableMap {" + StringUtils.join(map.keySet(), ", ") + "}";
- }
-}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
Thu Mar 4 20:49:45 2010
@@ -33,119 +33,11 @@
import org.apache.log4j.Logger;
-public class SSTableScanner implements Iterator<IteratingRow>, Closeable
+public abstract class SSTableScanner implements Iterator<IteratingRow>,
Closeable
{
- private static Logger logger = Logger.getLogger(SSTableScanner.class);
+ public abstract void seekTo(DecoratedKey seekKey);
- private final BufferedRandomAccessFile file;
- private final SSTableReader sstable;
- private IteratingRow row;
- private boolean exhausted = false;
- private Iterator<IteratingRow> iterator;
+ public abstract long getFileLength();
- /**
- * @param sstable SSTable to scan.
- */
- SSTableScanner(SSTableReader sstable, int bufferSize) throws IOException
- {
- this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r",
bufferSize);
- this.sstable = sstable;
- }
-
- public void close() throws IOException
- {
- file.close();
- }
-
- public void seekTo(DecoratedKey seekKey)
- {
- try
- {
- long position = sstable.getNearestPosition(seekKey);
- if (position < 0)
- {
- exhausted = true;
- return;
- }
- file.seek(position);
- row = null;
- }
- catch (IOException e)
- {
- throw new RuntimeException("corrupt sstable", e);
- }
- }
-
- public long getFileLength()
- {
- try
- {
- return file.length();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
-
- public long getFilePointer()
- {
- return file.getFilePointer();
- }
-
- public boolean hasNext()
- {
- if (iterator == null)
- iterator = exhausted ? Arrays.asList(new
IteratingRow[0]).iterator() : new KeyScanningIterator();
- return iterator.hasNext();
- }
-
- public IteratingRow next()
- {
- if (iterator == null)
- iterator = exhausted ? Arrays.asList(new
IteratingRow[0]).iterator() : new KeyScanningIterator();
- return iterator.next();
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
-
- private class KeyScanningIterator implements Iterator<IteratingRow>
- {
- public boolean hasNext()
- {
- try
- {
- if (row == null)
- return !file.isEOF();
- return row.getEndPosition() < file.length();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public IteratingRow next()
- {
- try
- {
- if (row != null)
- row.skipRemaining();
- assert !file.isEOF();
- return row = new IteratingRow(file, sstable);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
+ public abstract long getFilePointer();
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
Thu Mar 4 20:49:45 2010
@@ -60,7 +60,7 @@
for (SSTableReader sstable : replacements)
{
- assert sstable.getIndexPositions() != null;
+ assert sstable.getKeySamples() != null;
sstablesNew.add(sstable);
long size = sstable.bytesOnDisk();
liveSize.addAndGet(size);
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Thu Mar 4 20:49:45 2010
@@ -84,7 +84,7 @@
if (logger.isTraceEnabled())
logger.trace("wrote index of " + decoratedKey + " at " +
indexPosition);
- boolean spannedEntry = SSTableReader.bufferIndex(indexPosition) !=
SSTableReader.bufferIndex(indexFile.getFilePointer());
+ boolean spannedEntry = RowIndexedReader.bufferIndex(indexPosition) !=
RowIndexedReader.bufferIndex(indexFile.getFilePointer());
if (keysWritten++ % INDEX_INTERVAL == 0 || spannedEntry)
{
if (indexPositions == null)
@@ -144,12 +144,12 @@
// main data
dataFile.close(); // calls force
- String newpath = getFilename();
+ Descriptor newdesc = desc.asTemporary(false);
rename(indexFilename());
rename(filterFilename());
- newpath = rename(newpath); // important to do this last since index &
filter file names are derived from it
+ rename(getFilename());
- return new SSTableReader(newpath, partitioner, indexPositions,
spannedIndexDataPositions, bf);
+ return new RowIndexedReader(newdesc, partitioner, indexPositions,
spannedIndexDataPositions, bf);
}
static String rename(String tmpFilename)
@@ -176,8 +176,7 @@
SSTableWriter.rename(indexFilename(dataFileName));
SSTableWriter.rename(filterFilename(dataFileName));
dataFileName = SSTableWriter.rename(dataFileName);
- return SSTableReader.open(dataFileName,
- StorageService.getPartitioner());
+ return SSTableReader.open(dataFileName);
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Thu Mar 4 20:49:45 2010
@@ -367,8 +367,8 @@
}
if (cfs != null) // TODO test w/ valid CF definitions, this if{}
shouldn't be necessary
{
- for (SSTable.KeyPosition info: cfs.allIndexPositions())
- keys.add(info.key);
+ for (DecoratedKey sample : cfs.allKeySamples())
+ keys.add(sample);
}
if (keys.isEmpty())
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Thu Mar 4 20:49:45 2010
@@ -1221,10 +1221,10 @@
List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
- for (SSTable.KeyPosition info: cfs.allIndexPositions())
+ for (DecoratedKey sample : cfs.allKeySamples())
{
- if (range.contains(info.key.token))
- keys.add(info.key);
+ if (range.contains(sample.token))
+ keys.add(sample);
}
}
FBUtilities.sortSampledKeys(keys, range);
@@ -1250,10 +1250,10 @@
List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
- for (SSTable.KeyPosition info: cfs.allIndexPositions())
+ for (DecoratedKey key : cfs.allKeySamples())
{
- if (range.contains(info.key.token))
- keys.add(info.key);
+ if (range.contains(key.token))
+ keys.add(key);
}
}
FBUtilities.sortSampledKeys(keys, range);
Copied:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java.orig
(from r919170,
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java)
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java.orig?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java.orig&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java&r1=919170&r2=919171&rev=919171&view=diff
==============================================================================
(empty)
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Thu Mar 4 20:49:45 2010
@@ -69,7 +69,7 @@
Table table = Table.open("Keyspace1");
List<SSTableReader> ssTables = table.getAllSSTablesOnDisk();
assertEquals(1, ssTables.size());
- ssTables.get(0).forceBloomFilterFailures();
+ ssTables.get(0).forceFilterFailures();
ColumnFamily cf = store.getColumnFamily(new
IdentityQueryFilter("key2", new QueryPath("Standard1", null,
"Column1".getBytes())));
assertNull(cf);
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
(original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
Thu Mar 4 20:49:45 2010
@@ -163,7 +163,7 @@
Collection<SSTableReader> ssTables =
table.getColumnFamilyStore("Standard2").getSSTables();
assertEquals(1, ssTables.size());
- ssTables.iterator().next().forceBloomFilterFailures();
+ ssTables.iterator().next().forceFilterFailures();
validateGetSliceNoMatch(table);
}