Author: jbellis
Date: Thu Jan 7 02:57:14 2010
New Revision: 896742
URL: http://svn.apache.org/viewvc?rev=896742&view=rev
Log:
add support for multiple mmapped index segments, and add mmap_index_only option
patch by jbellis; tested by Brandon Williams for CASSANDRA-669
Modified:
incubator/cassandra/trunk/conf/storage-conf.xml
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=896742&r1=896741&r2=896742&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Thu Jan 7 02:57:14 2010
@@ -229,9 +229,11 @@
~ Access mode. mmapped i/o is substantially faster, but only practical on
~ a 64bit machine (which notably does not include EC2 "small" instances)
~ or relatively small datasets. "auto", the safe choice, will enable
- ~ mmapping on a 64bit JVM. Other values are "mmap" and "standard" if you
- ~ need to force those modes. (The buffer size settings that follow only
- ~ apply to standard, non-mmapped i/o.)
+ ~ mmapping on a 64bit JVM. Other values are "mmap", "mmap_index_only"
+ ~ (which may allow you to get part of the benefits of mmap on a 32bit
+ ~ machine by mmapping only index files) and "standard".
+ ~ (The buffer size settings that follow only apply to standard,
+ ~ non-mmapped i/o.)
-->
<DiskAccessMode>auto</DiskAccessMode>
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=896742&r1=896741&r2=896742&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Thu Jan 7 02:57:14 2010
@@ -53,6 +53,7 @@
public static enum DiskAccessMode {
auto,
mmap,
+ mmap_index_only,
standard,
}
@@ -128,6 +129,7 @@
private static int commitLogSyncPeriodMS_;
private static DiskAccessMode diskAccessMode_;
+ private static DiskAccessMode indexAccessMode_;
private static boolean snapshotBeforeCompaction_;
private static boolean autoBootstrap_ = false;
@@ -198,13 +200,23 @@
}
catch (IllegalArgumentException e)
{
- throw new ConfigurationException("DiskAccessMode must be
either 'auto', or 'mmap', or 'standard'");
+ throw new ConfigurationException("DiskAccessMode must be
either 'auto', 'mmap', 'mmap_index_only', or 'standard'");
}
if (diskAccessMode_ == DiskAccessMode.auto)
{
diskAccessMode_ = System.getProperty("os.arch").contains("64")
? DiskAccessMode.mmap : DiskAccessMode.standard;
+ indexAccessMode_ = diskAccessMode_;
logger_.info("Auto DiskAccessMode determined to be " +
diskAccessMode_);
}
+ else if (diskAccessMode_ == DiskAccessMode.mmap_index_only)
+ {
+ diskAccessMode_ = DiskAccessMode.standard;
+ indexAccessMode_ = DiskAccessMode.mmap;
+ }
+ else
+ {
+ indexAccessMode_ = diskAccessMode_;
+ }
/* Hashing strategy */
String partitionerClassName =
xmlUtils.getNodeValue("/Storage/Partitioner");
@@ -983,6 +995,11 @@
return diskAccessMode_;
}
+ public static DiskAccessMode getIndexAccessMode()
+ {
+ return indexAccessMode_;
+ }
+
public static double getFlushDataBufferSizeInMB()
{
return flushDataBufferSizeInMB_;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=896742&r1=896741&r2=896742&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
Thu Jan 7 02:57:14 2010
@@ -90,7 +90,7 @@
};
new Thread(runnable, "SSTABLE-DELETER").start();
}};
- private static final int BUFFER_SIZE = Integer.MAX_VALUE;
+ private static final long BUFFER_SIZE = Integer.MAX_VALUE;
public static int indexInterval()
{
@@ -185,8 +185,9 @@
}
FileDeletingReference phantomReference;
- private final MappedByteBuffer indexBuffer;
- private final MappedByteBuffer[] buffers; // jvm can only map up to 2GB at
a time
+ // jvm can only map up to 2GB at a time, so we split index/data into
segments of that size when using mmap i/o
+ private final MappedByteBuffer[] indexBuffers;
+ private final MappedByteBuffer[] buffers;
public static ConcurrentLinkedHashMap<DecoratedKey, PositionSize>
createKeyCache(int size)
@@ -204,7 +205,25 @@
throws IOException
{
super(filename, partitioner);
- indexBuffer = mmap(indexFilename());
+
+ if (DatabaseDescriptor.getIndexAccessMode() ==
DatabaseDescriptor.DiskAccessMode.mmap)
+ {
+ long indexLength = new File(indexFilename()).length();
+ int bufferCount = 1 + (int) (indexLength / BUFFER_SIZE);
+ indexBuffers = new MappedByteBuffer[bufferCount];
+ long remaining = indexLength;
+ for (int i = 0; i < bufferCount; i++)
+ {
+ indexBuffers[i] = mmap(indexFilename(), i * BUFFER_SIZE, (int)
Math.min(remaining, BUFFER_SIZE));
+ remaining -= BUFFER_SIZE;
+ }
+ }
+ else
+ {
+ assert DatabaseDescriptor.getIndexAccessMode() ==
DatabaseDescriptor.DiskAccessMode.standard;
+ indexBuffers = null;
+ }
+
if (DatabaseDescriptor.getDiskAccessMode() ==
DatabaseDescriptor.DiskAccessMode.mmap)
{
int bufferCount = 1 + (int) (new File(path).length() /
BUFFER_SIZE);
@@ -231,7 +250,7 @@
this.keyCache = keyCache;
}
- private static MappedByteBuffer mmap(String filename, int start, int size)
throws IOException
+ private static MappedByteBuffer mmap(String filename, long start, int
size) throws IOException
{
RandomAccessFile raf;
try
@@ -243,12 +262,6 @@
throw new IOError(e);
}
- if (size < 0)
- {
- if (raf.length() > Integer.MAX_VALUE)
- throw new UnsupportedOperationException("File " + filename + "
is too large to map in its entirety");
- size = (int) raf.length();
- }
try
{
return raf.getChannel().map(FileChannel.MapMode.READ_ONLY, start,
size);
@@ -259,11 +272,6 @@
}
}
- private static MappedByteBuffer mmap(String filename) throws IOException
- {
- return mmap(filename, 0, -1);
- }
-
private SSTableReader(String filename, IPartitioner partitioner) throws
IOException
{
this(filename, partitioner, null, null, null, null);
@@ -383,42 +391,59 @@
return info;
}
- FileDataInput input = new MappedFileDataInput(indexBuffer,
indexFilename());
- input.seek(sampledPosition.position);
- int i = 0;
- do
+ long p = sampledPosition.position;
+ FileDataInput input;
+ if (indexBuffers == null)
{
- DecoratedKey indexDecoratedKey;
- try
- {
- indexDecoratedKey =
partitioner.convertFromDiskFormat(input.readUTF());
- }
- catch (EOFException e)
- {
- return null;
- }
- long position = input.readLong();
- int v = indexDecoratedKey.compareTo(decoratedKey);
- if (v == 0)
+ input = new BufferedRandomAccessFile(indexFilename(), "r");
+ input.seek(p);
+ }
+ else
+ {
+ input = new MappedFileDataInput(indexBuffers[bufferIndex(p)],
indexFilename(), (int)(p % BUFFER_SIZE));
+ }
+ try
+ {
+ int i = 0;
+ do
{
- PositionSize info;
- if (input.getFilePointer() < input.length())
+ DecoratedKey indexDecoratedKey;
+ try
{
- int utflen = input.readUnsignedShort();
- input.skipBytes(utflen);
- info = new PositionSize(position, input.readLong() -
position);
+ indexDecoratedKey =
partitioner.convertFromDiskFormat(input.readUTF());
}
- else
+ catch (EOFException e)
{
- info = new PositionSize(position, length() - position);
+ return null;
}
- if (keyCache != null)
- keyCache.put(decoratedKey, info);
- return info;
- }
- if (v > 0)
- return null;
- } while (++i < INDEX_INTERVAL);
+ long position = input.readLong();
+ int v = indexDecoratedKey.compareTo(decoratedKey);
+ if (v == 0)
+ {
+ PositionSize info;
+ if (input.getFilePointer() < input.length())
+ {
+ int utflen = input.readUnsignedShort();
+ if (utflen != input.skipBytes(utflen))
+ throw new EOFException();
+ info = new PositionSize(position, input.readLong() -
position);
+ }
+ else
+ {
+ info = new PositionSize(position, length() - position);
+ }
+ if (keyCache != null)
+ keyCache.put(decoratedKey, info);
+ return info;
+ }
+ if (v > 0)
+ return null;
+ } while (++i < INDEX_INTERVAL);
+ }
+ finally
+ {
+ input.close();
+ }
return null;
}
@@ -431,25 +456,9 @@
return 0;
}
- // by default, we plan to start scanning at the nearest bsearched
index entry
- long start = sampledPosition.position;
- if (spannedIndexDataPositions != null)
- {
- // check if the index entry spans a mmap segment boundary
- PositionSize info = spannedIndexDataPositions.get(sampledPosition);
- if (info != null)
- {
- // if the key matches the index entry we don't have to scan
the index after all
- if (sampledPosition.key.compareTo(decoratedKey) == 0)
- return info.position;
- // otherwise, start scanning at the next entry (which won't
span a boundary;
- // if it did it would have been in the index sample and we
would have started with that instead)
- start = info.position + sampledPosition.key.serializedSize() +
(Long.SIZE / 8);
- }
- }
-
+ // can't use a MappedFileDataInput here, since we might cross a
segment boundary while scanning
BufferedRandomAccessFile input = new
BufferedRandomAccessFile(indexFilename(path), "r");
- input.seek(start);
+ input.seek(sampledPosition.position);
try
{
while (true)