Author: jbellis
Date: Thu Dec 31 21:32:13 2009
New Revision: 894944

URL: http://svn.apache.org/viewvc?rev=894944&view=rev
Log:
productize mmap approach: handle files > 2GB by chunking w/ fallback to BRAF
when a row crosses chunk boundaries (you don't want to have to check for 
crossing
boundary in each read() call, or you'll almost certainly waste more time than
the BRAF approach); add retrying-delete to wait for mmapped files to be unmapped
by finalizer after compaction

patch by jbellis; reviewed by Brandon Williams and goffinet for CASSANDRA-408

Modified:
    incubator/cassandra/trunk/conf/storage-conf.xml
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
    incubator/cassandra/trunk/test/conf/storage-conf.xml
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java

Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Thu Dec 31 21:32:13 2009
@@ -226,6 +226,16 @@
   <!--======================================================================-->
 
   <!--
+   ~ Access mode.  mmapped i/o is substantially faster, but only practical on
+   ~ a 64bit machine (which notably does not include EC2 "small" instances)
+   ~ or relatively small datasets.  "auto", the safe choice, will enable
+   ~ mmapping on a 64bit JVM.  Other values are "mmap" and "standard" if you
+   ~ need to force those modes.  (The buffer size settings that follow only
+   ~ apply to standard, non-mmapped i/o.)
+   -->
+  <DiskAccessMode>auto</DiskAccessMode>
+
+  <!--
    ~ Buffer size to use when performing contiguous column slices. Increase
    ~ this to the size of the column slices you typically perform. 
    ~ (Name-based queries are performed with a buffer size of 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
 Thu Dec 31 21:32:13 2009
@@ -48,7 +48,13 @@
     public static enum CommitLogSync {
         periodic,
         batch
-    };
+    }
+
+    public static enum DiskAccessMode {
+        auto,
+        mmap,
+        standard,
+    }
 
     public static final String random_ = "RANDOM";
     public static final String ophf_ = "OPHF";
@@ -121,6 +127,8 @@
     private static double commitLogSyncBatchMS_;
     private static int commitLogSyncPeriodMS_;
 
+    private static DiskAccessMode diskAccessMode_;
+
     private static boolean snapshotBeforeCompaction_;
     private static boolean autoBootstrap_ = false;
 
@@ -183,6 +191,21 @@
                 logger_.debug("Syncing log with a period of " + 
commitLogSyncPeriodMS_);
             }
 
+            String modeRaw = xmlUtils.getNodeValue("/Storage/DiskAccessMode");
+            try
+            {
+                diskAccessMode_ = DiskAccessMode.valueOf(modeRaw);
+            }
+            catch (IllegalArgumentException e)
+            {
+                throw new ConfigurationException("DiskAccessMode must be 
either 'auto', or 'mmap', or 'standard'");
+            }
+            if (diskAccessMode_ == DiskAccessMode.auto)
+            {
+                diskAccessMode_ = System.getProperty("os.arch").contains("64") 
? DiskAccessMode.mmap : DiskAccessMode.standard;
+                logger_.info("Auto DiskAccessMode determined to be " + 
diskAccessMode_);
+            }
+
             /* Hashing strategy */
             String partitionerClassName = 
xmlUtils.getNodeValue("/Storage/Partitioner");
             if (partitionerClassName == null)
@@ -955,6 +978,11 @@
         return commitLogSync_;
     }
 
+    public static DiskAccessMode getDiskAccessMode()
+    {
+        return diskAccessMode_;
+    }
+
     public static double getFlushDataBufferSizeInMB()
     {
         return flushDataBufferSizeInMB_;

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java 
Thu Dec 31 21:32:13 2009
@@ -557,7 +557,7 @@
             if (header.isSafeToDelete())
             {
                 logger_.info("Deleting obsolete commit log:" + oldFile);
-                DeletionService.deleteAsync(oldFile);
+                DeletionService.submitDelete(oldFile);
                 clHeaders_.remove(oldFile);
             }
             else

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
 Thu Dec 31 21:32:13 2009
@@ -46,15 +46,12 @@
         this.columns = columnNames;
 
         DecoratedKey decoratedKey = ssTable.getPartitioner().decorateKey(key);
-        long position = ssTable.getPosition(decoratedKey);
-        if (position < 0)
-            return;
 
-        FileDataInput file = 
ssTable.getFileDataInput(DatabaseDescriptor.getIndexedReadBufferSizeInKB() * 
1024);
+        FileDataInput file = ssTable.getFileDataInput(decoratedKey, 
DatabaseDescriptor.getIndexedReadBufferSizeInKB() * 1024);
+        if (file == null)
+            return;
         try
         {
-            file.seek(position);
-
             DecoratedKey keyInDisk = 
ssTable.getPartitioner().convertFromDiskFormat(file.readUTF());
             assert keyInDisk.equals(decoratedKey) : keyInDisk;
             file.readInt(); // data size

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
 Thu Dec 31 21:32:13 2009
@@ -29,7 +29,6 @@
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.*;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import com.google.common.collect.AbstractIterator;
@@ -52,12 +51,12 @@
 
         /* Morph key into actual key based on the partition type. */
         DecoratedKey decoratedKey = ssTable.getPartitioner().decorateKey(key);
-        long position = ssTable.getPosition(decoratedKey);
+        FileDataInput fdi = ssTable.getFileDataInput(decoratedKey, 
DatabaseDescriptor.getSlicedReadBufferSizeInKB() * 1024);
         this.comparator = ssTable.getColumnComparator();
         this.startColumn = startColumn;
         this.finishColumn = finishColumn;
-        if (position >= 0)
-            reader = new ColumnGroupReader(ssTable, decoratedKey, position);
+        if (fdi != null)
+            reader = new ColumnGroupReader(ssTable, decoratedKey, fdi);
     }
 
     private boolean isColumnNeeded(IColumn column)
@@ -120,11 +119,10 @@
         private int curRangeIndex;
         private Deque<IColumn> blockColumns = new ArrayDeque<IColumn>();
 
-        public ColumnGroupReader(SSTableReader ssTable, DecoratedKey key, long 
position) throws IOException
+        public ColumnGroupReader(SSTableReader ssTable, DecoratedKey key, 
FileDataInput input) throws IOException
         {
-            this.file = 
ssTable.getFileDataInput(DatabaseDescriptor.getSlicedReadBufferSizeInKB() * 
1024);
+            this.file = input;
 
-            file.seek(position);
             DecoratedKey keyInDisk = 
ssTable.getPartitioner().convertFromDiskFormat(file.readUTF());
             assert keyInDisk.equals(key);
 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/DeletionService.java 
Thu Dec 31 21:32:13 2009
@@ -10,9 +10,11 @@
 
 public class DeletionService
 {
+    public static final int MAX_RETRIES = 10;
+
     public static final ExecutorService executor = new 
JMXEnabledThreadPoolExecutor("FILEUTILS-DELETE-POOL");
 
-    public static void deleteAsync(final String file) throws IOException
+    public static void submitDelete(final String file)
     {
         Runnable deleter = new WrappedRunnable()
         {
@@ -24,4 +26,41 @@
         };
         executor.submit(deleter);
     }
+
+    public static void submitDeleteWithRetry(String file)
+    {
+        submitDeleteWithRetry(file, 0);
+    }
+
+    private static void submitDeleteWithRetry(final String file, final int 
retryCount)
+    {
+        Runnable deleter = new WrappedRunnable()
+        {
+            @Override
+            protected void runMayThrow() throws IOException
+            {
+                if (!new File(file).delete())
+                {
+                    if (retryCount > MAX_RETRIES)
+                        throw new IOException("Unable to delete " + file + " 
after " + MAX_RETRIES + " tries");
+                    new Thread(new Runnable()
+                    {
+                        public void run()
+                        {
+                            try
+                            {
+                                Thread.sleep(10000);
+                            }
+                            catch (InterruptedException e)
+                            {
+                                throw new AssertionError(e);
+                            }
+                            submitDeleteWithRetry(file, retryCount + 1);
+                        }
+                    }).start();
+                }
+            }
+        };
+        executor.submit(deleter);
+    }
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Thu 
Dec 31 21:32:13 2009
@@ -48,7 +48,7 @@
  */
 public abstract class SSTable
 {
-    private static final Logger logger = Logger.getLogger(SSTable.class);
+    static final Logger logger = Logger.getLogger(SSTable.class);
 
     public static final int FILES_ON_DISK = 3; // data, index, and bloom filter
 
@@ -102,7 +102,11 @@
     {
         if (new File(compactedFilename(dataFilename)).exists())
         {
-            delete(dataFilename);
+            FileUtils.deleteWithConfirm(new File(dataFilename));
+            FileUtils.deleteWithConfirm(new 
File(SSTable.indexFilename(dataFilename)));
+            FileUtils.deleteWithConfirm(new 
File(SSTable.filterFilename(dataFilename)));
+            FileUtils.deleteWithConfirm(new 
File(SSTable.compactedFilename(dataFilename)));
+            logger.info("Deleted " + dataFilename);
             return true;
         }
         return false;
@@ -152,15 +156,6 @@
         return new File(filename).getParentFile().getName();        
     }
 
-    static void delete(String path) throws IOException
-    {
-        FileUtils.deleteWithConfirm(new File(path));
-        FileUtils.deleteWithConfirm(new File(SSTable.indexFilename(path)));
-        FileUtils.deleteWithConfirm(new File(SSTable.filterFilename(path)));
-        FileUtils.deleteWithConfirm(new File(SSTable.compactedFilename(path)));
-        logger.info("Deleted " + path);
-    }
-
     public static long getTotalBytes(Iterable<SSTableReader> sstables)
     {
         long sum = 0;
@@ -215,4 +210,16 @@
                "path='" + path + '\'' +
                ')';
     }
+
+    public static class PositionSize
+    {
+        public final long position;
+        public final long size;
+
+        public PositionSize(long position, long size)
+        {
+            this.position = position;
+            this.size = size;
+        }
+    }
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java 
Thu Dec 31 21:32:13 2009
@@ -90,6 +90,7 @@
         };
         new Thread(runnable, "SSTABLE-DELETER").start();
     }};
+    private final int BUFFER_SIZE = Integer.MAX_VALUE;
 
     public static int indexInterval()
     {
@@ -185,21 +186,37 @@
 
     FileDeletingReference phantomReference;
     private final MappedByteBuffer indexBuffer;
-    private final MappedByteBuffer buffer;
+    private final MappedByteBuffer[] buffers; // jvm can only map up to 2GB at 
a time
 
 
-    public static ConcurrentLinkedHashMap<DecoratedKey, Long> 
createKeyCache(int size)
+    public static ConcurrentLinkedHashMap<DecoratedKey, PositionSize> 
createKeyCache(int size)
     {
         return 
ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.SECOND_CHANCE,
 size);
     }
 
-    private ConcurrentLinkedHashMap<DecoratedKey, Long> keyCache;
+    private ConcurrentLinkedHashMap<DecoratedKey, PositionSize> keyCache;
 
-    SSTableReader(String filename, IPartitioner partitioner, List<KeyPosition> 
indexPositions, BloomFilter bloomFilter, ConcurrentLinkedHashMap<DecoratedKey, 
Long> keyCache)
+    SSTableReader(String filename, IPartitioner partitioner, List<KeyPosition> 
indexPositions, BloomFilter bloomFilter, ConcurrentLinkedHashMap<DecoratedKey, 
PositionSize> keyCache)
+            throws IOException
     {
         super(filename, partitioner);
         indexBuffer = mmap(indexFilename());
-        buffer = mmap(path); // TODO 
System.getProperty("os.arch").contains("64") ? mmap(path) : null;
+        if (DatabaseDescriptor.getDiskAccessMode() == 
DatabaseDescriptor.DiskAccessMode.mmap)
+        {
+            int bufferCount = 1 + (int) (new File(path).length() / 
BUFFER_SIZE);
+            buffers = new MappedByteBuffer[bufferCount];
+            long remaining = length();
+            for (int i = 0; i < bufferCount; i++)
+            {
+                buffers[i] = mmap(path, i * BUFFER_SIZE, (int) 
Math.min(remaining, BUFFER_SIZE));
+                remaining -= BUFFER_SIZE;
+            }
+        }
+        else
+        {
+            assert DatabaseDescriptor.getDiskAccessMode() == 
DatabaseDescriptor.DiskAccessMode.standard;
+            buffers = null;
+        }
 
         this.indexPositions = indexPositions;
         this.bf = bloomFilter;
@@ -209,7 +226,7 @@
         this.keyCache = keyCache;
     }
 
-    private static MappedByteBuffer mmap(String filename)
+    private static MappedByteBuffer mmap(String filename, int start, int size) 
throws IOException
     {
         RandomAccessFile raf;
         try
@@ -220,17 +237,29 @@
         {
             throw new IOError(e);
         }
+
+        if (size < 0)
+        {
+            if (raf.length() > Integer.MAX_VALUE)
+                throw new UnsupportedOperationException("File " + filename + " 
is too large to map in its entirety");
+            size = (int) raf.length();
+        }
         try
         {
-            return raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, 
raf.length());
+            return raf.getChannel().map(FileChannel.MapMode.READ_ONLY, start, 
size);
         }
-        catch (IOException e)
+        finally
         {
-            throw new IOError(e);
+            raf.close();
         }
     }
 
-    private SSTableReader(String filename, IPartitioner partitioner)
+    private static MappedByteBuffer mmap(String filename) throws IOException
+    {
+        return mmap(filename, 0, -1);
+    }
+
+    private SSTableReader(String filename, IPartitioner partitioner) throws 
IOException
     {
         this(filename, partitioner, null, null, null);
     }
@@ -299,13 +328,13 @@
     /**
      * returns the position in the data file to find the given key, or -1 if 
the key is not present
      */
-    public long getPosition(DecoratedKey decoratedKey) throws IOException
+    public PositionSize getPosition(DecoratedKey decoratedKey) throws 
IOException
     {
         if (!bf.isPresent(partitioner.convertToDiskFormat(decoratedKey)))
-            return -1;
+            return null;
         if (keyCache != null)
         {
-            Long cachedPosition = keyCache.get(decoratedKey);
+            PositionSize cachedPosition = keyCache.get(decoratedKey);
             if (cachedPosition != null)
             {
                 return cachedPosition;
@@ -314,7 +343,7 @@
         long start = getIndexScanPosition(decoratedKey);
         if (start < 0)
         {
-            return -1;
+            return null;
         }
 
         FileDataInput input = new MappedFileDataInput(indexBuffer, 
indexFilename());
@@ -329,20 +358,31 @@
             }
             catch (EOFException e)
             {
-                return -1;
+                return null;
             }
             long position = input.readLong();
             int v = indexDecoratedKey.compareTo(decoratedKey);
             if (v == 0)
             {
+                PositionSize info;
+                if (input.getFilePointer() < input.length())
+                {
+                    int utflen = input.readUnsignedShort();
+                    input.skipBytes(utflen);
+                    info = new PositionSize(position, input.readLong() - 
position);
+                }
+                else
+                {
+                    info = new PositionSize(position, length() - position);
+                }
                 if (keyCache != null)
-                    keyCache.put(decoratedKey, position);
-                return position;
+                    keyCache.put(decoratedKey, info);
+                return info;
             }
             if (v > 0)
-                return -1;
+                return null;
         } while  (++i < INDEX_INTERVAL);
-        return -1;
+        return null;
     }
 
     /** like getPosition, but if key is not found will return the location of 
the first key _greater_ than the desired one, or -1 if no such key exists. */
@@ -395,7 +435,10 @@
         if (logger.isDebugEnabled())
             logger.debug("Marking " + path + " compacted");
         openedFiles.remove(path);
-        new File(compactedFilename()).createNewFile();
+        if (!new File(compactedFilename()).createNewFile())
+        {
+            throw new IOException("Unable to create compaction marker");
+        }
         phantomReference.deleteOnCleanup();
     }
 
@@ -422,20 +465,27 @@
 
     public SSTableScanner getScanner(int bufferSize) throws IOException
     {
-        FileDataInput fdi = getFileDataInput(bufferSize);
-        return new SSTableScanner(this, fdi);
+        return new SSTableScanner(this, bufferSize);
     }
 
-    public FileDataInput getFileDataInput(int bufferSize)
+    public FileDataInput getFileDataInput(DecoratedKey decoratedKey, int 
bufferSize) throws IOException
     {
-        try
-        {
-            return buffer == null ? new BufferedRandomAccessFile(path, "r", 
bufferSize) : new MappedFileDataInput(buffer, path);
-        }
-        catch (FileNotFoundException e)
+        PositionSize info = getPosition(decoratedKey);
+        if (info == null)
+            return null;
+
+        if (buffers == null || (bufferIndex(info.position) != 
bufferIndex(info.position + info.size)))
         {
-            throw new AssertionError(e);
+            BufferedRandomAccessFile file = new BufferedRandomAccessFile(path, 
"r", bufferSize);
+            file.seek(info.position);
+            return file;
         }
+        return new MappedFileDataInput(buffers[bufferIndex(info.position)], 
path, (int) (info.position % BUFFER_SIZE));
+    }
+
+    private int bufferIndex(long position)
+    {
+        return (int) (position / BUFFER_SIZE);
     }
 
     public AbstractType getColumnComparator()
@@ -526,7 +576,36 @@
     {
         if (deleteOnCleanup)
         {
-            SSTable.delete(path);
+            // this is tricky because the mmapping might not have been 
finalized yet,
+            // and delete will until it is.  additionally, we need to make 
sure to
+            // delete the data file first, so on restart the others will be 
recognized as GCable
+            // even if the compaction file deletion occurs next.
+            new Thread(new Runnable()
+            {
+                public void run()
+                {
+                    File datafile = new File(path);
+                    for (int i = 0; i < DeletionService.MAX_RETRIES; i++)
+                    {
+                        if (datafile.delete())
+                            break;
+                        try
+                        {
+                            Thread.sleep(10000);
+                        }
+                        catch (InterruptedException e)
+                        {
+                            throw new AssertionError(e);
+                        }
+                    }
+                    if (datafile.exists())
+                        throw new RuntimeException("Unable to delete " + path);
+                    SSTable.logger.info("Deleted " + path);
+                    
DeletionService.submitDeleteWithRetry(SSTable.indexFilename(path));
+                    
DeletionService.submitDeleteWithRetry(SSTable.filterFilename(path));
+                    
DeletionService.submitDeleteWithRetry(SSTable.compactedFilename(path));
+                }
+            }).start();
         }
     }
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java 
Thu Dec 31 21:32:13 2009
@@ -43,11 +43,10 @@
 
     /**
      * @param sstable SSTable to scan.
-     * @param bufferSize Number of bytes to buffer the file while scanning.
      */
-    SSTableScanner(SSTableReader sstable, FileDataInput file) throws 
IOException
+    SSTableScanner(SSTableReader sstable, int bufferSize) throws IOException
     {
-        this.file = file;
+        this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r", 
bufferSize);
         this.sstable = sstable;
     }
 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java 
Thu Dec 31 21:32:13 2009
@@ -138,7 +138,7 @@
         rename(filterFilename());
         path = rename(path); // important to do this last since index & filter 
file names are derived from it
 
-        ConcurrentLinkedHashMap<DecoratedKey, Long> keyCache = cacheFraction > 0
+        ConcurrentLinkedHashMap<DecoratedKey, SSTableReader.PositionSize> 
keyCache = cacheFraction > 0
                                                         ? 
SSTableReader.createKeyCache((int) (cacheFraction * keysWritten))
                                                         : null;
         return new SSTableReader(path, partitioner, indexPositions, bf, 
keyCache);

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
 Thu Dec 31 21:32:13 2009
@@ -7,26 +7,34 @@
 {
     private final MappedByteBuffer buffer;
     private final String filename;
-    private final long length;
     private int position;
+    private long fileLength;
 
     public MappedFileDataInput(MappedByteBuffer buffer, String filename)
     {
+        this(buffer, filename, 0);
+    }
+
+    public MappedFileDataInput(MappedByteBuffer buffer, String filename, int 
position)
+    {
         assert buffer != null;
         this.buffer = buffer;
         this.filename = filename;
-        length = new File(filename).length();
+        this.position = position;
+        assert (fileLength = new File(filename).length()) >= 0; // hack to 
only initialize fL when assertions are enabled
     }
 
     public void seek(long pos) throws IOException
     {
-        assert pos <= Integer.MAX_VALUE; // TODO chunk file into 2GB buffers
+        assert pos <= Integer.MAX_VALUE;
+        assert buffer.capacity() == fileLength; // calling this does not make 
sense on a mapped chunk of a larger file
         position = (int) pos;
     }
 
     public long length() throws IOException
     {
-        return length;
+        assert buffer.capacity() == fileLength; // calling this does not make 
sense on a mapped chunk of a larger file
+        return buffer.capacity();
     }
 
     public long getFilePointer()
@@ -41,7 +49,7 @@
 
     public int read() throws IOException
     {
-        if (position == length)
+        if (position == length())
             return -1;
         return buffer.get(position++) & 0xFF;
     }
@@ -50,9 +58,10 @@
     {
         if (n <= 0)
             return 0;
-        long oldPosition = position;
-        position = (int) Math.min(length(), position + n); // TODO fix > 2GB 
bug
-        return (int) (position - oldPosition);
+        int oldPosition = position;
+        assert ((long)oldPosition) + n <= Integer.MAX_VALUE;
+        position = Math.min(buffer.capacity(), position + n);
+        return position - oldPosition;
     }
 
     /*

Modified: incubator/cassandra/trunk/test/conf/storage-conf.xml
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/conf/storage-conf.xml?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/test/conf/storage-conf.xml Thu Dec 31 21:32:13 
2009
@@ -38,6 +38,7 @@
      <DataFileDirectory>build/test/cassandra/data</DataFileDirectory>
    </DataFileDirectories>
    
<BootstrapFileDirectory>build/test/cassandra/bootstrap</BootstrapFileDirectory>
+   <DiskAccessMode>mmap</DiskAccessMode>
    <StagingFileDirectory>build/test/cassandra/staging</StagingFileDirectory>
    <MemtableSizeInMB>1</MemtableSizeInMB>
    <MemtableObjectCountInMillions>0.00002</MemtableObjectCountInMillions> <!-- 
20 -->

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java 
(original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java 
Thu Dec 31 21:32:13 2009
@@ -40,6 +40,7 @@
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.IndexHelper;
+import org.apache.cassandra.io.SSTable;
 
 public class TableTest extends CleanupHelper
 {
@@ -359,9 +360,9 @@
         }
         SSTableReader sstable = cfStore.getSSTables().iterator().next();
         DecoratedKey decKey = sstable.getPartitioner().decorateKey(key);
-        long position = sstable.getPosition(decKey);
+        SSTable.PositionSize info = sstable.getPosition(decKey);
         BufferedRandomAccessFile file = new 
BufferedRandomAccessFile(sstable.getFilename(), "r");
-        file.seek(position);
+        file.seek(info.position);
         assert file.readUTF().equals(key);
         file.readInt();
         IndexHelper.skipBloomFilter(file);

Modified: 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java?rev=894944&r1=894943&r2=894944&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java 
(original)
+++ 
incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableTest.java 
Thu Dec 31 21:32:13 2009
@@ -52,7 +52,7 @@
     private void verifySingle(SSTableReader sstable, byte[] bytes, String key) 
throws IOException
     {
         BufferedRandomAccessFile file = new 
BufferedRandomAccessFile(sstable.path, "r");
-        file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key)));
+        
file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key)).position);
         assert key.equals(file.readUTF());
         int size = file.readInt();
         byte[] bytes2 = new byte[size];
@@ -84,7 +84,7 @@
         BufferedRandomAccessFile file = new 
BufferedRandomAccessFile(sstable.path, "r");
         for (String key : keys)
         {
-            
file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key)));
+            
file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key)).position);
             assert key.equals(file.readUTF());
             int size = file.readInt();
             byte[] bytes2 = new byte[size];


Reply via email to