Author: jbellis
Date: Thu Dec 31 21:32:13 2009
New Revision: 894944
URL: http://svn.apache.org/viewvc?rev=894944&view=rev
Log:
productize mmap approach: handle files > 2GB by chunking w/ fallback to BRAF
when a row crosses chunk boundaries (you don't want to have to check for
crossing
boundary in each read() call, or you'll almost certainly waste more time than
the BRAF approach); add retrying-delete to wait for mmapped files to be unmapped
by finalizer after compaction
patch by jbellis; reviewed by Brandon Williams and goffinet for CASSANDRA-408
Modified:
incubator/cassandra/trunk/conf/storage-conf.xml
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
incubator/cassandra/trunk/test/conf/storage-conf.xml
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Thu Dec 31 21:32:13 2009
@@ -226,6 +226,16 @@
<!--======================================================================-->
<!--
+ ~ Access mode. mmapped i/o is substantially faster, but only practical on
+ ~ a 64bit machine (which notably does not include EC2 "small" instances)
+ ~ or relatively small datasets. "auto", the safe choice, will enable
+ ~ mmapping on a 64bit JVM. Other values are "mmap" and "standard" if you
+ ~ need to force those modes. (The buffer size settings that follow only
+ ~ apply to standard, non-mmapped i/o.)
+ -->
+ <DiskAccessMode>auto</DiskAccessMode>
+
+ <!--
~ Buffer size to use when performing contiguous column slices. Increase
~ this to the size of the column slices you typically perform.
~ (Name-based queries are performed with a buffer size of
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Thu Dec 31 21:32:13 2009
@@ -48,7 +48,13 @@
public static enum CommitLogSync {
periodic,
batch
- };
+ }
+
+ public static enum DiskAccessMode {
+ auto,
+ mmap,
+ standard,
+ }
public static final String random_ = "RANDOM";
public static final String ophf_ = "OPHF";
@@ -121,6 +127,8 @@
private static double commitLogSyncBatchMS_;
private static int commitLogSyncPeriodMS_;
+ private static DiskAccessMode diskAccessMode_;
+
private static boolean snapshotBeforeCompaction_;
private static boolean autoBootstrap_ = false;
@@ -183,6 +191,21 @@
logger_.debug("Syncing log with a period of " +
commitLogSyncPeriodMS_);
}
+ String modeRaw = xmlUtils.getNodeValue("/Storage/DiskAccessMode");
+ try
+ {
+ diskAccessMode_ = DiskAccessMode.valueOf(modeRaw);
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new ConfigurationException("DiskAccessMode must be
either 'auto', or 'mmap', or 'standard'");
+ }
+ if (diskAccessMode_ == DiskAccessMode.auto)
+ {
+ diskAccessMode_ = System.getProperty("os.arch").contains("64")
? DiskAccessMode.mmap : DiskAccessMode.standard;
+ logger_.info("Auto DiskAccessMode determined to be " +
diskAccessMode_);
+ }
+
/* Hashing strategy */
String partitionerClassName =
xmlUtils.getNodeValue("/Storage/Partitioner");
if (partitionerClassName == null)
@@ -955,6 +978,11 @@
return commitLogSync_;
}
+ public static DiskAccessMode getDiskAccessMode()
+ {
+ return diskAccessMode_;
+ }
+
public static double getFlushDataBufferSizeInMB()
{
return flushDataBufferSizeInMB_;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
Thu Dec 31 21:32:13 2009
@@ -557,7 +557,7 @@
if (header.isSafeToDelete())
{
logger_.info("Deleting obsolete commit log:" + oldFile);
- DeletionService.deleteAsync(oldFile);
+ DeletionService.submitDelete(oldFile);
clHeaders_.remove(oldFile);
}
else
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
Thu Dec 31 21:32:13 2009
@@ -46,15 +46,12 @@
this.columns = columnNames;
DecoratedKey decoratedKey = ssTable.getPartitioner().decorateKey(key);
- long position = ssTable.getPosition(decoratedKey);
- if (position < 0)
- return;
- FileDataInput file =
ssTable.getFileDataInput(DatabaseDescriptor.getIndexedReadBufferSizeInKB() *
1024);
+ FileDataInput file = ssTable.getFileDataInput(decoratedKey,
DatabaseDescriptor.getIndexedReadBufferSizeInKB() * 1024);
+ if (file == null)
+ return;
try
{
- file.seek(position);
-
DecoratedKey keyInDisk =
ssTable.getPartitioner().convertFromDiskFormat(file.readUTF());
assert keyInDisk.equals(decoratedKey) : keyInDisk;
file.readInt(); // data size
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
Thu Dec 31 21:32:13 2009
@@ -29,7 +29,6 @@
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.*;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.config.DatabaseDescriptor;
import com.google.common.collect.AbstractIterator;
@@ -52,12 +51,12 @@
/* Morph key into actual key based on the partition type. */
DecoratedKey decoratedKey = ssTable.getPartitioner().decorateKey(key);
- long position = ssTable.getPosition(decoratedKey);
+ FileDataInput fdi = ssTable.getFileDataInput(decoratedKey,
DatabaseDescriptor.getSlicedReadBufferSizeInKB() * 1024);
this.comparator = ssTable.getColumnComparator();
this.startColumn = startColumn;
this.finishColumn = finishColumn;
- if (position >= 0)
- reader = new ColumnGroupReader(ssTable, decoratedKey, position);
+ if (fdi != null)
+ reader = new ColumnGroupReader(ssTable, decoratedKey, fdi);
}
private boolean isColumnNeeded(IColumn column)
@@ -120,11 +119,10 @@
private int curRangeIndex;
private Deque<IColumn> blockColumns = new ArrayDeque<IColumn>();
- public ColumnGroupReader(SSTableReader ssTable, DecoratedKey key, long
position) throws IOException
+ public ColumnGroupReader(SSTableReader ssTable, DecoratedKey key,
FileDataInput input) throws IOException
{
- this.file =
ssTable.getFileDataInput(DatabaseDescriptor.getSlicedReadBufferSizeInKB() *
1024);
+ this.file = input;
- file.seek(position);
DecoratedKey keyInDisk =
ssTable.getPartitioner().convertFromDiskFormat(file.readUTF());
assert keyInDisk.equals(key);
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java
Thu Dec 31 21:32:13 2009
@@ -10,9 +10,11 @@
public class DeletionService
{
+ public static final int MAX_RETRIES = 10;
+
public static final ExecutorService executor = new
JMXEnabledThreadPoolExecutor("FILEUTILS-DELETE-POOL");
- public static void deleteAsync(final String file) throws IOException
+ public static void submitDelete(final String file)
{
Runnable deleter = new WrappedRunnable()
{
@@ -24,4 +26,41 @@
};
executor.submit(deleter);
}
+
+ public static void submitDeleteWithRetry(String file)
+ {
+ submitDeleteWithRetry(file, 0);
+ }
+
+ private static void submitDeleteWithRetry(final String file, final int
retryCount)
+ {
+ Runnable deleter = new WrappedRunnable()
+ {
+ @Override
+ protected void runMayThrow() throws IOException
+ {
+ if (!new File(file).delete())
+ {
+ if (retryCount > MAX_RETRIES)
+ throw new IOException("Unable to delete " + file + "
after " + MAX_RETRIES + " tries");
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(10000);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ submitDeleteWithRetry(file, retryCount + 1);
+ }
+ }).start();
+ }
+ }
+ };
+ executor.submit(deleter);
+ }
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Thu
Dec 31 21:32:13 2009
@@ -48,7 +48,7 @@
*/
public abstract class SSTable
{
- private static final Logger logger = Logger.getLogger(SSTable.class);
+ static final Logger logger = Logger.getLogger(SSTable.class);
public static final int FILES_ON_DISK = 3; // data, index, and bloom filter
@@ -102,7 +102,11 @@
{
if (new File(compactedFilename(dataFilename)).exists())
{
- delete(dataFilename);
+ FileUtils.deleteWithConfirm(new File(dataFilename));
+ FileUtils.deleteWithConfirm(new
File(SSTable.indexFilename(dataFilename)));
+ FileUtils.deleteWithConfirm(new
File(SSTable.filterFilename(dataFilename)));
+ FileUtils.deleteWithConfirm(new
File(SSTable.compactedFilename(dataFilename)));
+ logger.info("Deleted " + dataFilename);
return true;
}
return false;
@@ -152,15 +156,6 @@
return new File(filename).getParentFile().getName();
}
- static void delete(String path) throws IOException
- {
- FileUtils.deleteWithConfirm(new File(path));
- FileUtils.deleteWithConfirm(new File(SSTable.indexFilename(path)));
- FileUtils.deleteWithConfirm(new File(SSTable.filterFilename(path)));
- FileUtils.deleteWithConfirm(new File(SSTable.compactedFilename(path)));
- logger.info("Deleted " + path);
- }
-
public static long getTotalBytes(Iterable<SSTableReader> sstables)
{
long sum = 0;
@@ -215,4 +210,16 @@
"path='" + path + '\'' +
')';
}
+
+ public static class PositionSize
+ {
+ public final long position;
+ public final long size;
+
+ public PositionSize(long position, long size)
+ {
+ this.position = position;
+ this.size = size;
+ }
+ }
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
Thu Dec 31 21:32:13 2009
@@ -90,6 +90,7 @@
};
new Thread(runnable, "SSTABLE-DELETER").start();
}};
+ private final int BUFFER_SIZE = Integer.MAX_VALUE;
public static int indexInterval()
{
@@ -185,21 +186,37 @@
FileDeletingReference phantomReference;
private final MappedByteBuffer indexBuffer;
- private final MappedByteBuffer buffer;
+ private final MappedByteBuffer[] buffers; // jvm can only map up to 2GB at
a time
- public static ConcurrentLinkedHashMap<DecoratedKey, Long>
createKeyCache(int size)
+ public static ConcurrentLinkedHashMap<DecoratedKey, PositionSize>
createKeyCache(int size)
{
return
ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.SECOND_CHANCE,
size);
}
- private ConcurrentLinkedHashMap<DecoratedKey, Long> keyCache;
+ private ConcurrentLinkedHashMap<DecoratedKey, PositionSize> keyCache;
- SSTableReader(String filename, IPartitioner partitioner, List<KeyPosition>
indexPositions, BloomFilter bloomFilter, ConcurrentLinkedHashMap<DecoratedKey,
Long> keyCache)
+ SSTableReader(String filename, IPartitioner partitioner, List<KeyPosition>
indexPositions, BloomFilter bloomFilter, ConcurrentLinkedHashMap<DecoratedKey,
PositionSize> keyCache)
+ throws IOException
{
super(filename, partitioner);
indexBuffer = mmap(indexFilename());
- buffer = mmap(path); // TODO
System.getProperty("os.arch").contains("64") ? mmap(path) : null;
+ if (DatabaseDescriptor.getDiskAccessMode() ==
DatabaseDescriptor.DiskAccessMode.mmap)
+ {
+ int bufferCount = 1 + (int) (new File(path).length() /
BUFFER_SIZE);
+ buffers = new MappedByteBuffer[bufferCount];
+ long remaining = length();
+ for (int i = 0; i < bufferCount; i++)
+ {
+ buffers[i] = mmap(path, i * BUFFER_SIZE, (int)
Math.min(remaining, BUFFER_SIZE));
+ remaining -= BUFFER_SIZE;
+ }
+ }
+ else
+ {
+ assert DatabaseDescriptor.getDiskAccessMode() ==
DatabaseDescriptor.DiskAccessMode.standard;
+ buffers = null;
+ }
this.indexPositions = indexPositions;
this.bf = bloomFilter;
@@ -209,7 +226,7 @@
this.keyCache = keyCache;
}
- private static MappedByteBuffer mmap(String filename)
+ private static MappedByteBuffer mmap(String filename, int start, int size)
throws IOException
{
RandomAccessFile raf;
try
@@ -220,17 +237,29 @@
{
throw new IOError(e);
}
+
+ if (size < 0)
+ {
+ if (raf.length() > Integer.MAX_VALUE)
+ throw new UnsupportedOperationException("File " + filename + "
is too large to map in its entirety");
+ size = (int) raf.length();
+ }
try
{
- return raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0,
raf.length());
+ return raf.getChannel().map(FileChannel.MapMode.READ_ONLY, start,
size);
}
- catch (IOException e)
+ finally
{
- throw new IOError(e);
+ raf.close();
}
}
- private SSTableReader(String filename, IPartitioner partitioner)
+ private static MappedByteBuffer mmap(String filename) throws IOException
+ {
+ return mmap(filename, 0, -1);
+ }
+
+ private SSTableReader(String filename, IPartitioner partitioner) throws
IOException
{
this(filename, partitioner, null, null, null);
}
@@ -299,13 +328,13 @@
/**
* returns the position in the data file to find the given key, or -1 if
the key is not present
*/
- public long getPosition(DecoratedKey decoratedKey) throws IOException
+ public PositionSize getPosition(DecoratedKey decoratedKey) throws
IOException
{
if (!bf.isPresent(partitioner.convertToDiskFormat(decoratedKey)))
- return -1;
+ return null;
if (keyCache != null)
{
- Long cachedPosition = keyCache.get(decoratedKey);
+ PositionSize cachedPosition = keyCache.get(decoratedKey);
if (cachedPosition != null)
{
return cachedPosition;
@@ -314,7 +343,7 @@
long start = getIndexScanPosition(decoratedKey);
if (start < 0)
{
- return -1;
+ return null;
}
FileDataInput input = new MappedFileDataInput(indexBuffer,
indexFilename());
@@ -329,20 +358,31 @@
}
catch (EOFException e)
{
- return -1;
+ return null;
}
long position = input.readLong();
int v = indexDecoratedKey.compareTo(decoratedKey);
if (v == 0)
{
+ PositionSize info;
+ if (input.getFilePointer() < input.length())
+ {
+ int utflen = input.readUnsignedShort();
+ input.skipBytes(utflen);
+ info = new PositionSize(position, input.readLong() -
position);
+ }
+ else
+ {
+ info = new PositionSize(position, length() - position);
+ }
if (keyCache != null)
- keyCache.put(decoratedKey, position);
- return position;
+ keyCache.put(decoratedKey, info);
+ return info;
}
if (v > 0)
- return -1;
+ return null;
} while (++i < INDEX_INTERVAL);
- return -1;
+ return null;
}
/** like getPosition, but if key is not found will return the location of
the first key _greater_ than the desired one, or -1 if no such key exists. */
@@ -395,7 +435,10 @@
if (logger.isDebugEnabled())
logger.debug("Marking " + path + " compacted");
openedFiles.remove(path);
- new File(compactedFilename()).createNewFile();
+ if (!new File(compactedFilename()).createNewFile())
+ {
+ throw new IOException("Unable to create compaction marker");
+ }
phantomReference.deleteOnCleanup();
}
@@ -422,20 +465,27 @@
public SSTableScanner getScanner(int bufferSize) throws IOException
{
- FileDataInput fdi = getFileDataInput(bufferSize);
- return new SSTableScanner(this, fdi);
+ return new SSTableScanner(this, bufferSize);
}
- public FileDataInput getFileDataInput(int bufferSize)
+ public FileDataInput getFileDataInput(DecoratedKey decoratedKey, int
bufferSize) throws IOException
{
- try
- {
- return buffer == null ? new BufferedRandomAccessFile(path, "r",
bufferSize) : new MappedFileDataInput(buffer, path);
- }
- catch (FileNotFoundException e)
+ PositionSize info = getPosition(decoratedKey);
+ if (info == null)
+ return null;
+
+ if (buffers == null || (bufferIndex(info.position) !=
bufferIndex(info.position + info.size)))
{
- throw new AssertionError(e);
+ BufferedRandomAccessFile file = new BufferedRandomAccessFile(path,
"r", bufferSize);
+ file.seek(info.position);
+ return file;
}
+ return new MappedFileDataInput(buffers[bufferIndex(info.position)],
path, (int) (info.position % BUFFER_SIZE));
+ }
+
+ private int bufferIndex(long position)
+ {
+ return (int) (position / BUFFER_SIZE);
}
public AbstractType getColumnComparator()
@@ -526,7 +576,36 @@
{
if (deleteOnCleanup)
{
- SSTable.delete(path);
+ // this is tricky because the mmapping might not have been
finalized yet,
+ // and delete will until it is. additionally, we need to make
sure to
+ // delete the data file first, so on restart the others will be
recognized as GCable
+ // even if the compaction file deletion occurs next.
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ File datafile = new File(path);
+ for (int i = 0; i < DeletionService.MAX_RETRIES; i++)
+ {
+ if (datafile.delete())
+ break;
+ try
+ {
+ Thread.sleep(10000);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+ if (datafile.exists())
+ throw new RuntimeException("Unable to delete " + path);
+ SSTable.logger.info("Deleted " + path);
+
DeletionService.submitDeleteWithRetry(SSTable.indexFilename(path));
+
DeletionService.submitDeleteWithRetry(SSTable.filterFilename(path));
+
DeletionService.submitDeleteWithRetry(SSTable.compactedFilename(path));
+ }
+ }).start();
}
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
Thu Dec 31 21:32:13 2009
@@ -43,11 +43,10 @@
/**
* @param sstable SSTable to scan.
- * @param bufferSize Number of bytes to buffer the file while scanning.
*/
- SSTableScanner(SSTableReader sstable, FileDataInput file) throws
IOException
+ SSTableScanner(SSTableReader sstable, int bufferSize) throws IOException
{
- this.file = file;
+ this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r",
bufferSize);
this.sstable = sstable;
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
Thu Dec 31 21:32:13 2009
@@ -138,7 +138,7 @@
rename(filterFilename());
path = rename(path); // important to do this last since index & filter
file names are derived from it
- ConcurrentLinkedHashMap<DecoratedKey, Long> keyCache = cacheFraction > 0
+ ConcurrentLinkedHashMap<DecoratedKey, SSTableReader.PositionSize>
keyCache = cacheFraction > 0
?
SSTableReader.createKeyCache((int) (cacheFraction * keysWritten))
: null;
return new SSTableReader(path, partitioner, indexPositions, bf,
keyCache);
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
Thu Dec 31 21:32:13 2009
@@ -7,26 +7,34 @@
{
private final MappedByteBuffer buffer;
private final String filename;
- private final long length;
private int position;
+ private long fileLength;
public MappedFileDataInput(MappedByteBuffer buffer, String filename)
{
+ this(buffer, filename, 0);
+ }
+
+ public MappedFileDataInput(MappedByteBuffer buffer, String filename, int
position)
+ {
assert buffer != null;
this.buffer = buffer;
this.filename = filename;
- length = new File(filename).length();
+ this.position = position;
+ assert (fileLength = new File(filename).length()) >= 0; // hack to
only initialize fL when assertions are enabled
}
public void seek(long pos) throws IOException
{
- assert pos <= Integer.MAX_VALUE; // TODO chunk file into 2GB buffers
+ assert pos <= Integer.MAX_VALUE;
+ assert buffer.capacity() == fileLength; // calling this does not make
sense on a mapped chunk of a larger file
position = (int) pos;
}
public long length() throws IOException
{
- return length;
+ assert buffer.capacity() == fileLength; // calling this does not make
sense on a mapped chunk of a larger file
+ return buffer.capacity();
}
public long getFilePointer()
@@ -41,7 +49,7 @@
public int read() throws IOException
{
- if (position == length)
+ if (position == length())
return -1;
return buffer.get(position++) & 0xFF;
}
@@ -50,9 +58,10 @@
{
if (n <= 0)
return 0;
- long oldPosition = position;
- position = (int) Math.min(length(), position + n); // TODO fix > 2GB
bug
- return (int) (position - oldPosition);
+ int oldPosition = position;
+ assert ((long)oldPosition) + n <= Integer.MAX_VALUE;
+ position = Math.min(buffer.capacity(), position + n);
+ return position - oldPosition;
}
/*
Modified: incubator/cassandra/trunk/test/conf/storage-conf.xml
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/conf/storage-conf.xml?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/test/conf/storage-conf.xml Thu Dec 31 21:32:13
2009
@@ -38,6 +38,7 @@
<DataFileDirectory>build/test/cassandra/data</DataFileDirectory>
</DataFileDirectories>
<BootstrapFileDirectory>build/test/cassandra/bootstrap</BootstrapFileDirectory>
+ <DiskAccessMode>mmap</DiskAccessMode>
<StagingFileDirectory>build/test/cassandra/staging</StagingFileDirectory>
<MemtableSizeInMB>1</MemtableSizeInMB>
<MemtableObjectCountInMillions>0.00002</MemtableObjectCountInMillions> <!--
20 -->
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
(original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
Thu Dec 31 21:32:13 2009
@@ -40,6 +40,7 @@
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.IndexHelper;
+import org.apache.cassandra.io.SSTable;
public class TableTest extends CleanupHelper
{
@@ -359,9 +360,9 @@
}
SSTableReader sstable = cfStore.getSSTables().iterator().next();
DecoratedKey decKey = sstable.getPartitioner().decorateKey(key);
- long position = sstable.getPosition(decKey);
+ SSTable.PositionSize info = sstable.getPosition(decKey);
BufferedRandomAccessFile file = new
BufferedRandomAccessFile(sstable.getFilename(), "r");
- file.seek(position);
+ file.seek(info.position);
assert file.readUTF().equals(key);
file.readInt();
IndexHelper.skipBloomFilter(file);
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
Thu Dec 31 21:32:13 2009
@@ -52,7 +52,7 @@
private void verifySingle(SSTableReader sstable, byte[] bytes, String key)
throws IOException
{
BufferedRandomAccessFile file = new
BufferedRandomAccessFile(sstable.path, "r");
- file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key)));
+
file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key)).position);
assert key.equals(file.readUTF());
int size = file.readInt();
byte[] bytes2 = new byte[size];
@@ -84,7 +84,7 @@
BufferedRandomAccessFile file = new
BufferedRandomAccessFile(sstable.path, "r");
for (String key : keys)
{
-
file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key)));
+
file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key)).position);
assert key.equals(file.readUTF());
int size = file.readInt();
byte[] bytes2 = new byte[size];