Author: jbellis
Date: Thu Jan  7 02:57:14 2010
New Revision: 896742

URL: http://svn.apache.org/viewvc?rev=896742&view=rev
Log:
add support for multiple mmapped index segments, and add mmap_index_only option
patch by jbellis; tested by Brandon Williams for CASSANDRA-669

Modified:
    incubator/cassandra/trunk/conf/storage-conf.xml
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java

Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=896742&r1=896741&r2=896742&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Thu Jan  7 02:57:14 2010
@@ -229,9 +229,11 @@
    ~ Access mode.  mmapped i/o is substantially faster, but only practical on
    ~ a 64bit machine (which notably does not include EC2 "small" instances)
    ~ or relatively small datasets.  "auto", the safe choice, will enable
-   ~ mmapping on a 64bit JVM.  Other values are "mmap" and "standard" if you
-   ~ need to force those modes.  (The buffer size settings that follow only
-   ~ apply to standard, non-mmapped i/o.)
+   ~ mmapping on a 64bit JVM.  Other values are "mmap", "mmap_index_only"
+   ~ (which may allow you to get part of the benefits of mmap on a 32bit
+   ~ machine by mmapping only index files) and "standard".
+   ~ (The buffer size settings that follow only apply to standard,
+   ~ non-mmapped i/o.)
    -->
   <DiskAccessMode>auto</DiskAccessMode>
 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=896742&r1=896741&r2=896742&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
 Thu Jan  7 02:57:14 2010
@@ -53,6 +53,7 @@
     public static enum DiskAccessMode {
         auto,
         mmap,
+        mmap_index_only,
         standard,
     }
 
@@ -128,6 +129,7 @@
     private static int commitLogSyncPeriodMS_;
 
     private static DiskAccessMode diskAccessMode_;
+    private static DiskAccessMode indexAccessMode_;
 
     private static boolean snapshotBeforeCompaction_;
     private static boolean autoBootstrap_ = false;
@@ -198,13 +200,23 @@
             }
             catch (IllegalArgumentException e)
             {
-                throw new ConfigurationException("DiskAccessMode must be 
either 'auto', or 'mmap', or 'standard'");
+                throw new ConfigurationException("DiskAccessMode must be 
either 'auto', 'mmap', 'mmap_index_only', or 'standard'");
             }
             if (diskAccessMode_ == DiskAccessMode.auto)
             {
                 diskAccessMode_ = System.getProperty("os.arch").contains("64") 
? DiskAccessMode.mmap : DiskAccessMode.standard;
+                indexAccessMode_ = diskAccessMode_;
                 logger_.info("Auto DiskAccessMode determined to be " + 
diskAccessMode_);
             }
+            else if (diskAccessMode_ == DiskAccessMode.mmap_index_only)
+            {
+                diskAccessMode_ = DiskAccessMode.standard;
+                indexAccessMode_ = DiskAccessMode.mmap;
+            }
+            else
+            {
+                indexAccessMode_ = diskAccessMode_;
+            }
 
             /* Hashing strategy */
             String partitionerClassName = 
xmlUtils.getNodeValue("/Storage/Partitioner");
@@ -983,6 +995,11 @@
         return diskAccessMode_;
     }
 
+    public static DiskAccessMode getIndexAccessMode()
+    {
+        return indexAccessMode_;
+    }
+
     public static double getFlushDataBufferSizeInMB()
     {
         return flushDataBufferSizeInMB_;

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=896742&r1=896741&r2=896742&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java 
Thu Jan  7 02:57:14 2010
@@ -90,7 +90,7 @@
         };
         new Thread(runnable, "SSTABLE-DELETER").start();
     }};
-    private static final int BUFFER_SIZE = Integer.MAX_VALUE;
+    private static final long BUFFER_SIZE = Integer.MAX_VALUE;
 
     public static int indexInterval()
     {
@@ -185,8 +185,9 @@
     }
 
     FileDeletingReference phantomReference;
-    private final MappedByteBuffer indexBuffer;
-    private final MappedByteBuffer[] buffers; // jvm can only map up to 2GB at 
a time
+    // jvm can only map up to 2GB at a time, so we split index/data into 
segments of that size when using mmap i/o
+    private final MappedByteBuffer[] indexBuffers;
+    private final MappedByteBuffer[] buffers;
 
 
     public static ConcurrentLinkedHashMap<DecoratedKey, PositionSize> 
createKeyCache(int size)
@@ -204,7 +205,25 @@
             throws IOException
     {
         super(filename, partitioner);
-        indexBuffer = mmap(indexFilename());
+
+        if (DatabaseDescriptor.getIndexAccessMode() == 
DatabaseDescriptor.DiskAccessMode.mmap)
+        {
+            long indexLength = new File(indexFilename()).length();
+            int bufferCount = 1 + (int) (indexLength / BUFFER_SIZE);
+            indexBuffers = new MappedByteBuffer[bufferCount];
+            long remaining = indexLength;
+            for (int i = 0; i < bufferCount; i++)
+            {
+                indexBuffers[i] = mmap(indexFilename(), i * BUFFER_SIZE, (int) 
Math.min(remaining, BUFFER_SIZE));
+                remaining -= BUFFER_SIZE;
+            }
+        }
+        else
+        {
+            assert DatabaseDescriptor.getIndexAccessMode() == 
DatabaseDescriptor.DiskAccessMode.standard;
+            indexBuffers = null;
+        }
+
         if (DatabaseDescriptor.getDiskAccessMode() == 
DatabaseDescriptor.DiskAccessMode.mmap)
         {
             int bufferCount = 1 + (int) (new File(path).length() / 
BUFFER_SIZE);
@@ -231,7 +250,7 @@
         this.keyCache = keyCache;
     }
 
-    private static MappedByteBuffer mmap(String filename, int start, int size) 
throws IOException
+    private static MappedByteBuffer mmap(String filename, long start, int 
size) throws IOException
     {
         RandomAccessFile raf;
         try
@@ -243,12 +262,6 @@
             throw new IOError(e);
         }
 
-        if (size < 0)
-        {
-            if (raf.length() > Integer.MAX_VALUE)
-                throw new UnsupportedOperationException("File " + filename + " 
is too large to map in its entirety");
-            size = (int) raf.length();
-        }
         try
         {
             return raf.getChannel().map(FileChannel.MapMode.READ_ONLY, start, 
size);
@@ -259,11 +272,6 @@
         }
     }
 
-    private static MappedByteBuffer mmap(String filename) throws IOException
-    {
-        return mmap(filename, 0, -1);
-    }
-
     private SSTableReader(String filename, IPartitioner partitioner) throws 
IOException
     {
         this(filename, partitioner, null, null, null, null);
@@ -383,42 +391,59 @@
                 return info;
         }
 
-        FileDataInput input = new MappedFileDataInput(indexBuffer, 
indexFilename());
-        input.seek(sampledPosition.position);
-        int i = 0;
-        do
+        long p = sampledPosition.position;
+        FileDataInput input;
+        if (indexBuffers == null)
         {
-            DecoratedKey indexDecoratedKey;
-            try
-            {
-                indexDecoratedKey = 
partitioner.convertFromDiskFormat(input.readUTF());
-            }
-            catch (EOFException e)
-            {
-                return null;
-            }
-            long position = input.readLong();
-            int v = indexDecoratedKey.compareTo(decoratedKey);
-            if (v == 0)
+            input = new BufferedRandomAccessFile(indexFilename(), "r");
+            input.seek(p);
+        }
+        else
+        {
+            input = new MappedFileDataInput(indexBuffers[bufferIndex(p)], 
indexFilename(), (int)(p % BUFFER_SIZE));
+        }
+        try
+        {
+            int i = 0;
+            do
             {
-                PositionSize info;
-                if (input.getFilePointer() < input.length())
+                DecoratedKey indexDecoratedKey;
+                try
                 {
-                    int utflen = input.readUnsignedShort();
-                    input.skipBytes(utflen);
-                    info = new PositionSize(position, input.readLong() - 
position);
+                    indexDecoratedKey = 
partitioner.convertFromDiskFormat(input.readUTF());
                 }
-                else
+                catch (EOFException e)
                 {
-                    info = new PositionSize(position, length() - position);
+                    return null;
                 }
-                if (keyCache != null)
-                    keyCache.put(decoratedKey, info);
-                return info;
-            }
-            if (v > 0)
-                return null;
-        } while  (++i < INDEX_INTERVAL);
+                long position = input.readLong();
+                int v = indexDecoratedKey.compareTo(decoratedKey);
+                if (v == 0)
+                {
+                    PositionSize info;
+                    if (input.getFilePointer() < input.length())
+                    {
+                        int utflen = input.readUnsignedShort();
+                        if (utflen != input.skipBytes(utflen))
+                            throw new EOFException();
+                        info = new PositionSize(position, input.readLong() - 
position);
+                    }
+                    else
+                    {
+                        info = new PositionSize(position, length() - position);
+                    }
+                    if (keyCache != null)
+                        keyCache.put(decoratedKey, info);
+                    return info;
+                }
+                if (v > 0)
+                    return null;
+            } while  (++i < INDEX_INTERVAL);
+        }
+        finally
+        {
+            input.close();
+        }
         return null;
     }
 
@@ -431,25 +456,9 @@
             return 0;
         }
 
-        // by default, we plan to start scanning at the nearest bsearched 
index entry
-        long start = sampledPosition.position;
-        if (spannedIndexDataPositions != null)
-        {
-            // check if the index entry spans a mmap segment boundary
-            PositionSize info = spannedIndexDataPositions.get(sampledPosition);
-            if (info != null)
-            {
-                // if the key matches the index entry we don't have to scan 
the index after all
-                if (sampledPosition.key.compareTo(decoratedKey) == 0)
-                    return info.position;
-                // otherwise, start scanning at the next entry (which won't 
span a boundary;
-                // if it did it would have been in the index sample and we 
would have started with that instead)
-                start = info.position + sampledPosition.key.serializedSize() + 
(Long.SIZE / 8);
-            }
-        }
-
+        // can't use a MappedFileDataInput here, since we might cross a 
segment boundary while scanning
         BufferedRandomAccessFile input = new 
BufferedRandomAccessFile(indexFilename(path), "r");
-        input.seek(start);
+        input.seek(sampledPosition.position);
         try
         {
             while (true)


Reply via email to