Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java 
Thu Jul 30 15:30:21 2009
@@ -1,355 +1,355 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.*;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnSerializer;
-import org.apache.cassandra.db.marshal.AbstractType;
-
-
-/**
- * Provides helper to serialize, deserialize and use column indexes.
- * Author : Karthik Ranganathan ( [email protected] )
- */
-
-public class IndexHelper
-{
-       /**
-        * Serializes a column index to a data output stream
-        * @param indexSizeInBytes Size of index to be written
-        * @param columnIndexList List of column index entries as objects
-        * @param dos the output stream into which the column index is to be 
written
-        * @throws IOException
-        */
-       public static void serialize(int indexSizeInBytes, 
List<ColumnIndexInfo> columnIndexList, DataOutputStream dos) throws IOException
-       {
-               /* if we have no data to index, the write that there is no 
index present */
-               if(indexSizeInBytes == 0 || columnIndexList == null || 
columnIndexList.size() == 0)
-               {
-                       dos.writeBoolean(false);
-               }
-               else
-               {
-               /* write if we are storing a column index */
-               dos.writeBoolean(true);
-               /* write the size of the index */
-               dos.writeInt(indexSizeInBytes);
-               for( ColumnIndexInfo cIndexInfo : columnIndexList )
-               {
-                       cIndexInfo.serialize(dos);
-               }
-               }
-       }
-    
-    /**
-     * Skip the bloom filter and the index and return the bytes read.
-     * @param in the data input from which the bloom filter and index 
-     *           should be skipped
-     * @return number of bytes read.
-     * @throws IOException
-     */
-    public static int skipBloomFilterAndIndex(DataInput in) throws IOException
-    {
-        int totalBytesRead = 0;
-        /* size of the bloom filter */
-        int size = in.readInt();
-        totalBytesRead += 4;
-        /* skip the serialized bloom filter */
-        in.skipBytes(size);
-        totalBytesRead += size;
-        /* skip the index on disk */
-        /* read if the file has column indexes */
-        boolean hasColumnIndexes = in.readBoolean();
-        totalBytesRead += 1;
-        if ( hasColumnIndexes )
-        {
-            totalBytesRead += skipIndex(in);
-        }
-        return totalBytesRead;
-    }
-    
-    /**
-     * Skip the bloom filter and return the bytes read.
-     * @param in the data input from which the bloom filter 
-     *           should be skipped
-     * @return number of bytes read.
-     * @throws IOException
-     */
-    public static int skipBloomFilter(DataInput in) throws IOException
-    {
-        int totalBytesRead = 0;
-        /* size of the bloom filter */
-        int size = in.readInt();
-        totalBytesRead += 4;
-        /* skip the serialized bloom filter */
-        in.skipBytes(size);
-        totalBytesRead += size;
-        return totalBytesRead;
-    }
-
-       /**
-        * Skip the index and return the number of bytes read.
-        * @param file the data input from which the index should be skipped
-        * @return number of bytes read from the data input
-        * @throws IOException
-        */
-       public static int skipIndex(DataInput file) throws IOException
-       {
-        /* read only the column index list */
-        int columnIndexSize = file.readInt();
-        int totalBytesRead = 4;
-
-        /* skip the column index data */
-        file.skipBytes(columnIndexSize);
-        totalBytesRead += columnIndexSize;
-
-        return totalBytesRead;
-       }
-    
-    /**
-     * Deserialize the index into a structure and return the number of bytes 
read.
-     * @param tableName
-     *...@param in Input from which the serialized form of the index is read
-     * @param columnIndexList the structure which is filled in with the 
deserialized index   @return number of bytes read from the input
-     * @throws IOException
-     */
-       static int deserializeIndex(String tableName, String cfName, DataInput 
in, List<ColumnIndexInfo> columnIndexList) throws IOException
-       {
-               /* read only the column index list */
-               int columnIndexSize = in.readInt();
-               int totalBytesRead = 4;
-
-               /* read the indexes into a separate buffer */
-               DataOutputBuffer indexOut = new DataOutputBuffer();
-        /* write the data into buffer */
-               indexOut.write(in, columnIndexSize);
-               totalBytesRead += columnIndexSize;
-
-               /* now deserialize the index list */
-        DataInputBuffer indexIn = new DataInputBuffer();
-        indexIn.reset(indexOut.getData(), indexOut.getLength());
-        
-        AbstractType comparator = DatabaseDescriptor.getComparator(tableName, 
cfName);
-
-        while (indexIn.available() > 0)
-        {
-            // TODO this is all kinds of messed up
-            ColumnIndexInfo cIndexInfo = new ColumnIndexInfo(comparator);
-            cIndexInfo = cIndexInfo.deserialize(indexIn);
-            columnIndexList.add(cIndexInfo);
-        }
-
-        return totalBytesRead;
-       }
-
-    /**
-     * Returns the range in which a given column falls in the index
-     * @param columnIndexList the in-memory representation of the column index
-     * @param dataSize the total size of the data
-     * @param totalNumCols total number of columns
-     * @return an object describing a subrange in which the column is 
serialized
-     */
-       static ColumnRange 
getColumnRangeFromNameIndex(IndexHelper.ColumnIndexInfo cIndexInfo, 
List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int 
totalNumCols)
-       {
-               /* find the offset for the column */
-        int size = columnIndexList.size();
-        long start = 0;
-        long end = dataSize;
-        int numColumns = 0;      
-       
-        int index = Collections.binarySearch(columnIndexList, cIndexInfo);
-        if ( index < 0 )
-        {
-            /* We are here which means that the requested column is not an 
index. */
-            index = (++index)*(-1);
-        }
-        else
-        {
-               ++index;
-        }
-
-        /* calculate the starting offset from which we have to read */
-        start = (index == 0) ? 0 : columnIndexList.get(index - 1).position();
-
-        if( index < size )
-        {
-               end = columnIndexList.get(index).position();
-            numColumns = columnIndexList.get(index).count();            
-        }
-        else
-        {
-               end = dataSize;  
-            int totalColsIndexed = 0;
-            for( IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList )
-            {
-                totalColsIndexed += colPosInfo.count();
-            }
-            numColumns = totalNumCols - totalColsIndexed;
-        }
-
-        return new ColumnRange(start, end, numColumns);
-       }
-
-       /**
-        * Returns the sub-ranges that contain the list of columns in 
columnNames.
-        * @param columnNames The list of columns whose subranges need to be 
found
-        * @param columnIndexList the deserialized column indexes
-        * @param dataSize the total size of data
-        * @param totalNumCols the total number of columns
-        * @return a list of subranges which contain all the columns in 
columnNames
-        */
-       static List<ColumnRange> 
getMultiColumnRangesFromNameIndex(SortedSet<byte[]> columnNames, 
List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int 
totalNumCols)
-       {
-               List<ColumnRange> columnRanges = new ArrayList<ColumnRange>();
-
-        if (columnIndexList.size() == 0)
-        {
-            columnRanges.add(new ColumnRange(0, dataSize, totalNumCols));
-        }
-        else
-        {
-            Map<Long, Boolean> offset = new HashMap<Long, Boolean>();
-            for (byte[] name : columnNames)
-            {
-                IndexHelper.ColumnIndexInfo cIndexInfo = new 
IndexHelper.ColumnIndexInfo(name, 0, 0, (AbstractType)columnNames.comparator());
-                ColumnRange columnRange = 
getColumnRangeFromNameIndex(cIndexInfo, columnIndexList, dataSize, 
totalNumCols);
-                if (offset.get(columnRange.coordinate().start_) == null)
-                {
-                    columnRanges.add(columnRange);
-                    offset.put(columnRange.coordinate().start_, true);
-                }
-            }
-        }
-
-        return columnRanges;
-       }
-
-
-    /**
-     * A column range containing the start and end
-     * offset of the appropriate column index chunk
-     * and the number of columns in that chunk.
-     * @author alakshman
-     *
-     */
-    public static class ColumnRange
-    {
-        private Coordinate coordinate_;
-        private int columnCount_;
-        
-        ColumnRange(long start, long end, int columnCount)
-        {
-            coordinate_ = new Coordinate(start, end);
-            columnCount_ = columnCount;
-        }
-        
-        Coordinate coordinate()
-        {
-            return coordinate_;
-        }
-        
-        int count()
-        {
-            return columnCount_;
-        }                
-    }
-
-       /**
-        * A helper class to generate indexes while
-     * the columns are sorted by name on disk.
-       */
-    public static class ColumnIndexInfo implements Comparable<ColumnIndexInfo>
-    {
-        private long position_;
-        private int columnCount_;
-        private byte[] name_;
-        private AbstractType comparator_;
-
-        public ColumnIndexInfo(AbstractType comparator_)
-        {
-            this.comparator_ = comparator_;
-        }
-
-        public ColumnIndexInfo(byte[] name, long position, int columnCount, 
AbstractType comparator)
-        {
-            this(comparator);
-            assert name.length == 0 || !"".equals(comparator.getString(name)); 
// Todo r/m length == 0 hack
-            name_ = name;
-            position_ = position;
-            columnCount_ = columnCount;
-        }
-                
-        public long position()
-        {
-            return position_;
-        }
-        
-        public void position(long position)
-        {
-            position_ = position;
-        }
-        
-        int count()
-        {
-            return columnCount_;
-        }
-        
-        public void count(int count)
-        {
-            columnCount_ = count;
-        }
-
-        public int compareTo(ColumnIndexInfo rhs)
-        {
-            return comparator_.compare(name_, rhs.name_);
-        }
-
-        public void serialize(DataOutputStream dos) throws IOException
-        {
-            dos.writeLong(position());
-            dos.writeInt(count());
-            ColumnSerializer.writeName(name_, dos);
-        }
-
-        public ColumnIndexInfo deserialize(DataInputStream dis) throws 
IOException
-        {
-            long position = dis.readLong();
-            int columnCount = dis.readInt();
-            byte[] name = ColumnSerializer.readName(dis);
-            return new ColumnIndexInfo(name, position, columnCount, 
comparator_);
-        }
-
-        public int size()
-        {
-            // serialized size -- CS.writeName includes a 2-byte length prefix
-            return 8 + 4 + 2 + name_.length;
-        }
-
-        public byte[] name()
-        {
-            return name_;
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnSerializer;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+
+/**
+ * Provides helper to serialize, deserialize and use column indexes.
+ * Author : Karthik Ranganathan ( [email protected] )
+ */
+
+public class IndexHelper
+{
+       /**
+        * Serializes a column index to a data output stream
+        * @param indexSizeInBytes Size of index to be written
+        * @param columnIndexList List of column index entries as objects
+        * @param dos the output stream into which the column index is to be 
written
+        * @throws IOException
+        */
+       public static void serialize(int indexSizeInBytes, 
List<ColumnIndexInfo> columnIndexList, DataOutputStream dos) throws IOException
+       {
+               /* if we have no data to index, the write that there is no 
index present */
+               if(indexSizeInBytes == 0 || columnIndexList == null || 
columnIndexList.size() == 0)
+               {
+                       dos.writeBoolean(false);
+               }
+               else
+               {
+               /* write if we are storing a column index */
+               dos.writeBoolean(true);
+               /* write the size of the index */
+               dos.writeInt(indexSizeInBytes);
+               for( ColumnIndexInfo cIndexInfo : columnIndexList )
+               {
+                       cIndexInfo.serialize(dos);
+               }
+               }
+       }
+    
+    /**
+     * Skip the bloom filter and the index and return the bytes read.
+     * @param in the data input from which the bloom filter and index 
+     *           should be skipped
+     * @return number of bytes read.
+     * @throws IOException
+     */
+    public static int skipBloomFilterAndIndex(DataInput in) throws IOException
+    {
+        int totalBytesRead = 0;
+        /* size of the bloom filter */
+        int size = in.readInt();
+        totalBytesRead += 4;
+        /* skip the serialized bloom filter */
+        in.skipBytes(size);
+        totalBytesRead += size;
+        /* skip the index on disk */
+        /* read if the file has column indexes */
+        boolean hasColumnIndexes = in.readBoolean();
+        totalBytesRead += 1;
+        if ( hasColumnIndexes )
+        {
+            totalBytesRead += skipIndex(in);
+        }
+        return totalBytesRead;
+    }
+    
+    /**
+     * Skip the bloom filter and return the bytes read.
+     * @param in the data input from which the bloom filter 
+     *           should be skipped
+     * @return number of bytes read.
+     * @throws IOException
+     */
+    public static int skipBloomFilter(DataInput in) throws IOException
+    {
+        int totalBytesRead = 0;
+        /* size of the bloom filter */
+        int size = in.readInt();
+        totalBytesRead += 4;
+        /* skip the serialized bloom filter */
+        in.skipBytes(size);
+        totalBytesRead += size;
+        return totalBytesRead;
+    }
+
+       /**
+        * Skip the index and return the number of bytes read.
+        * @param file the data input from which the index should be skipped
+        * @return number of bytes read from the data input
+        * @throws IOException
+        */
+       public static int skipIndex(DataInput file) throws IOException
+       {
+        /* read only the column index list */
+        int columnIndexSize = file.readInt();
+        int totalBytesRead = 4;
+
+        /* skip the column index data */
+        file.skipBytes(columnIndexSize);
+        totalBytesRead += columnIndexSize;
+
+        return totalBytesRead;
+       }
+    
+    /**
+     * Deserialize the index into a structure and return the number of bytes 
read.
+     * @param tableName
+     *...@param in Input from which the serialized form of the index is read
+     * @param columnIndexList the structure which is filled in with the 
deserialized index   @return number of bytes read from the input
+     * @throws IOException
+     */
+       static int deserializeIndex(String tableName, String cfName, DataInput 
in, List<ColumnIndexInfo> columnIndexList) throws IOException
+       {
+               /* read only the column index list */
+               int columnIndexSize = in.readInt();
+               int totalBytesRead = 4;
+
+               /* read the indexes into a separate buffer */
+               DataOutputBuffer indexOut = new DataOutputBuffer();
+        /* write the data into buffer */
+               indexOut.write(in, columnIndexSize);
+               totalBytesRead += columnIndexSize;
+
+               /* now deserialize the index list */
+        DataInputBuffer indexIn = new DataInputBuffer();
+        indexIn.reset(indexOut.getData(), indexOut.getLength());
+        
+        AbstractType comparator = DatabaseDescriptor.getComparator(tableName, 
cfName);
+
+        while (indexIn.available() > 0)
+        {
+            // TODO this is all kinds of messed up
+            ColumnIndexInfo cIndexInfo = new ColumnIndexInfo(comparator);
+            cIndexInfo = cIndexInfo.deserialize(indexIn);
+            columnIndexList.add(cIndexInfo);
+        }
+
+        return totalBytesRead;
+       }
+
+    /**
+     * Returns the range in which a given column falls in the index
+     * @param columnIndexList the in-memory representation of the column index
+     * @param dataSize the total size of the data
+     * @param totalNumCols total number of columns
+     * @return an object describing a subrange in which the column is 
serialized
+     */
+       static ColumnRange 
getColumnRangeFromNameIndex(IndexHelper.ColumnIndexInfo cIndexInfo, 
List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int 
totalNumCols)
+       {
+               /* find the offset for the column */
+        int size = columnIndexList.size();
+        long start = 0;
+        long end = dataSize;
+        int numColumns = 0;      
+       
+        int index = Collections.binarySearch(columnIndexList, cIndexInfo);
+        if ( index < 0 )
+        {
+            /* We are here which means that the requested column is not an 
index. */
+            index = (++index)*(-1);
+        }
+        else
+        {
+               ++index;
+        }
+
+        /* calculate the starting offset from which we have to read */
+        start = (index == 0) ? 0 : columnIndexList.get(index - 1).position();
+
+        if( index < size )
+        {
+               end = columnIndexList.get(index).position();
+            numColumns = columnIndexList.get(index).count();            
+        }
+        else
+        {
+               end = dataSize;  
+            int totalColsIndexed = 0;
+            for( IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList )
+            {
+                totalColsIndexed += colPosInfo.count();
+            }
+            numColumns = totalNumCols - totalColsIndexed;
+        }
+
+        return new ColumnRange(start, end, numColumns);
+       }
+
+       /**
+        * Returns the sub-ranges that contain the list of columns in 
columnNames.
+        * @param columnNames The list of columns whose subranges need to be 
found
+        * @param columnIndexList the deserialized column indexes
+        * @param dataSize the total size of data
+        * @param totalNumCols the total number of columns
+        * @return a list of subranges which contain all the columns in 
columnNames
+        */
+       static List<ColumnRange> 
getMultiColumnRangesFromNameIndex(SortedSet<byte[]> columnNames, 
List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int 
totalNumCols)
+       {
+               List<ColumnRange> columnRanges = new ArrayList<ColumnRange>();
+
+        if (columnIndexList.size() == 0)
+        {
+            columnRanges.add(new ColumnRange(0, dataSize, totalNumCols));
+        }
+        else
+        {
+            Map<Long, Boolean> offset = new HashMap<Long, Boolean>();
+            for (byte[] name : columnNames)
+            {
+                IndexHelper.ColumnIndexInfo cIndexInfo = new 
IndexHelper.ColumnIndexInfo(name, 0, 0, (AbstractType)columnNames.comparator());
+                ColumnRange columnRange = 
getColumnRangeFromNameIndex(cIndexInfo, columnIndexList, dataSize, 
totalNumCols);
+                if (offset.get(columnRange.coordinate().start_) == null)
+                {
+                    columnRanges.add(columnRange);
+                    offset.put(columnRange.coordinate().start_, true);
+                }
+            }
+        }
+
+        return columnRanges;
+       }
+
+
+    /**
+     * A column range containing the start and end
+     * offset of the appropriate column index chunk
+     * and the number of columns in that chunk.
+     * @author alakshman
+     *
+     */
+    public static class ColumnRange
+    {
+        private Coordinate coordinate_;
+        private int columnCount_;
+        
+        ColumnRange(long start, long end, int columnCount)
+        {
+            coordinate_ = new Coordinate(start, end);
+            columnCount_ = columnCount;
+        }
+        
+        Coordinate coordinate()
+        {
+            return coordinate_;
+        }
+        
+        int count()
+        {
+            return columnCount_;
+        }                
+    }
+
+       /**
+        * A helper class to generate indexes while
+     * the columns are sorted by name on disk.
+       */
+    public static class ColumnIndexInfo implements Comparable<ColumnIndexInfo>
+    {
+        private long position_;
+        private int columnCount_;
+        private byte[] name_;
+        private AbstractType comparator_;
+
+        public ColumnIndexInfo(AbstractType comparator_)
+        {
+            this.comparator_ = comparator_;
+        }
+
+        public ColumnIndexInfo(byte[] name, long position, int columnCount, 
AbstractType comparator)
+        {
+            this(comparator);
+            assert name.length == 0 || !"".equals(comparator.getString(name)); 
// Todo r/m length == 0 hack
+            name_ = name;
+            position_ = position;
+            columnCount_ = columnCount;
+        }
+                
+        public long position()
+        {
+            return position_;
+        }
+        
+        public void position(long position)
+        {
+            position_ = position;
+        }
+        
+        int count()
+        {
+            return columnCount_;
+        }
+        
+        public void count(int count)
+        {
+            columnCount_ = count;
+        }
+
+        public int compareTo(ColumnIndexInfo rhs)
+        {
+            return comparator_.compare(name_, rhs.name_);
+        }
+
+        public void serialize(DataOutputStream dos) throws IOException
+        {
+            dos.writeLong(position());
+            dos.writeInt(count());
+            ColumnSerializer.writeName(name_, dos);
+        }
+
+        public ColumnIndexInfo deserialize(DataInputStream dis) throws 
IOException
+        {
+            long position = dis.readLong();
+            int columnCount = dis.readInt();
+            byte[] name = ColumnSerializer.readName(dis);
+            return new ColumnIndexInfo(name, position, columnCount, 
comparator_);
+        }
+
+        public int size()
+        {
+            // serialized size -- CS.writeName includes a 2-byte length prefix
+            return 8 + 4 + 2 + name_.length;
+        }
+
+        public byte[] name()
+        {
+            return name_;
+        }
+    }
+}

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Thu 
Jul 30 15:30:21 2009
@@ -69,7 +69,7 @@
 
     static String parseTableName(String filename)
     {
-        return new File(filename).getParentFile().getName();        
+        return new File(filename).getParentFile().getName();        
     }
 
     /**

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java 
Thu Jul 30 15:30:21 2009
@@ -1,434 +1,434 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.*;
-import java.util.*;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.SequenceFile.ColumnGroupReader;
-import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.FileUtils;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import 
com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
-
-public class SSTableReader extends SSTable
-{
-    private static Logger logger = Logger.getLogger(SSTableReader.class);
-
-    private static FileSSTableMap openedFiles = new FileSSTableMap();
-
-    public static int indexInterval()
-    {
-        return INDEX_INTERVAL;
-    }
-
-    // todo can we refactor to take list of sstables?
-    public static int getApproximateKeyCount(List<String> dataFiles)
-    {
-        int count = 0;
-
-        for (String dataFileName : dataFiles)
-        {
-            SSTableReader sstable = openedFiles.get(dataFileName);
-            assert sstable != null;
-            int indexKeyCount = sstable.getIndexPositions().size();
-            count = count + (indexKeyCount + 1) * INDEX_INTERVAL;
-            if (logger.isDebugEnabled())
-                logger.debug("index size for bloom filter calc for file  : " + 
dataFileName + "   : " + count);
-        }
-
-        return count;
-    }
-
-    /**
-     * Get all indexed keys in the SSTable.
-     */
-    public static List<String> getIndexedKeys()
-    {
-        List<String> indexedKeys = new ArrayList<String>();
-
-        for (SSTableReader sstable : openedFiles.values())
-        {
-            for (KeyPosition kp : sstable.getIndexPositions())
-            {
-                indexedKeys.add(kp.key);
-            }
-        }
-        Collections.sort(indexedKeys);
-
-        return indexedKeys;
-    }
-
-    public static synchronized SSTableReader open(String dataFileName) throws 
IOException
-    {
-        return open(dataFileName, StorageService.getPartitioner(), 
DatabaseDescriptor.getKeysCachedFraction(parseTableName(dataFileName)));
-    }
-
-    public static synchronized SSTableReader open(String dataFileName, 
IPartitioner partitioner, double cacheFraction) throws IOException
-    {
-        SSTableReader sstable = openedFiles.get(dataFileName);
-        if (sstable == null)
-        {
-            assert partitioner != null;
-            sstable = new SSTableReader(dataFileName, partitioner);
-
-            long start = System.currentTimeMillis();
-            sstable.loadIndexFile();
-            sstable.loadBloomFilter();
-            if (cacheFraction > 0)
-            {
-                sstable.keyCache = 
createKeyCache((int)((sstable.getIndexPositions().size() + 1) * INDEX_INTERVAL 
* cacheFraction));
-            }
-            if (logger.isDebugEnabled())
-                logger.debug("INDEX LOAD TIME for "  + dataFileName + ": " + 
(System.currentTimeMillis() - start) + " ms.");
-
-            openedFiles.put(dataFileName, sstable);
-        }
-        return sstable;
-    }
-
-    public static synchronized SSTableReader get(String dataFileName) throws 
IOException
-    {
-        SSTableReader sstable = openedFiles.get(dataFileName);
-        assert sstable != null;
-        return sstable;
-    }
-
-    public static ConcurrentLinkedHashMap<String, Long> createKeyCache(int 
size)
-    {
-        return 
ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.SECOND_CHANCE,
 size);
-    }
-
-
-    private ConcurrentLinkedHashMap<String, Long> keyCache;
-
-    SSTableReader(String filename, IPartitioner partitioner, List<KeyPosition> 
indexPositions, BloomFilter bloomFilter, ConcurrentLinkedHashMap<String, Long> 
keyCache)
-    {
-        super(filename, partitioner);
-        this.indexPositions = indexPositions;
-        this.bf = bloomFilter;
-        this.keyCache = keyCache;
-        synchronized (SSTableReader.this)
-        {
-            openedFiles.put(filename, this);
-        }
-    }
-
-    private SSTableReader(String filename, IPartitioner partitioner)
-    {
-        super(filename, partitioner);
-    }
-
-    public List<KeyPosition> getIndexPositions()
-    {
-        return indexPositions;
-    }
-
-    private void loadBloomFilter() throws IOException
-    {
-        DataInputStream stream = new DataInputStream(new 
FileInputStream(filterFilename()));
-        bf = BloomFilter.serializer().deserialize(stream);
-    }
-
-    private void loadIndexFile() throws IOException
-    {
-        BufferedRandomAccessFile input = new 
BufferedRandomAccessFile(indexFilename(), "r");
-        indexPositions = new ArrayList<KeyPosition>();
-
-        int i = 0;
-        long indexSize = input.length();
-        while (true)
-        {
-            long indexPosition = input.getFilePointer();
-            if (indexPosition == indexSize)
-            {
-                break;
-            }
-            String decoratedKey = input.readUTF();
-            input.readLong();
-            if (i++ % INDEX_INTERVAL == 0)
-            {
-                indexPositions.add(new KeyPosition(decoratedKey, 
indexPosition));
-            }
-        }
-    }
-
-    /** get the position in the index file to start scanning to find the given 
key (at most indexInterval keys away) */
-    private long getIndexScanPosition(String decoratedKey, IPartitioner 
partitioner)
-    {
-        assert indexPositions != null && indexPositions.size() > 0;
-        int index = Collections.binarySearch(indexPositions, new 
KeyPosition(decoratedKey, -1));
-        if (index < 0)
-        {
-            // binary search gives us the first index _greater_ than the key 
searched for,
-            // i.e., its insertion position
-            int greaterThan = (index + 1) * -1;
-            if (greaterThan == 0)
-                return -1;
-            return indexPositions.get(greaterThan - 1).position;
-        }
-        else
-        {
-            return indexPositions.get(index).position;
-        }
-    }
-
-    /**
-     * returns the position in the data file to find the given key, or -1 if 
the key is not present
-     */
-    public long getPosition(String decoratedKey, IPartitioner partitioner) 
throws IOException
-    {
-        if (!bf.isPresent(decoratedKey))
-            return -1;
-        if (keyCache != null)
-        {
-            Long cachedPosition = keyCache.get(decoratedKey);
-            if (cachedPosition != null)
-            {
-                return cachedPosition;
-            }
-        }
-        long start = getIndexScanPosition(decoratedKey, partitioner);
-        if (start < 0)
-        {
-            return -1;
-        }
-
-        // TODO mmap the index file?
-        BufferedRandomAccessFile input = new 
BufferedRandomAccessFile(indexFilename(dataFile), "r");
-        input.seek(start);
-        int i = 0;
-        try
-        {
-            do
-            {
-                String indexDecoratedKey;
-                try
-                {
-                    indexDecoratedKey = input.readUTF();
-                }
-                catch (EOFException e)
-                {
-                    return -1;
-                }
-                long position = input.readLong();
-                int v = 
partitioner.getDecoratedKeyComparator().compare(indexDecoratedKey, 
decoratedKey);
-                if (v == 0)
-                {
-                    if (keyCache != null)
-                        keyCache.put(decoratedKey, position);
-                    return position;
-                }
-                if (v > 0)
-                    return -1;
-            } while  (++i < INDEX_INTERVAL);
-        }
-        finally
-        {
-            input.close();
-        }
-        return -1;
-    }
-
-    /** like getPosition, but if key is not found will return the location of 
the first key _greater_ than the desired one, or -1 if no such key exists. */
-    public long getNearestPosition(String decoratedKey) throws IOException
-    {
-        long start = getIndexScanPosition(decoratedKey, partitioner);
-        if (start < 0)
-        {
-            return 0;
-        }
-        BufferedRandomAccessFile input = new 
BufferedRandomAccessFile(indexFilename(dataFile), "r");
-        input.seek(start);
-        try
-        {
-            while (true)
-            {
-                String indexDecoratedKey;
-                try
-                {
-                    indexDecoratedKey = input.readUTF();
-                }
-                catch (EOFException e)
-                {
-                    return -1;
-                }
-                long position = input.readLong();
-                int v = 
partitioner.getDecoratedKeyComparator().compare(indexDecoratedKey, 
decoratedKey);
-                if (v >= 0)
-                    return position;
-            }
-        }
-        finally
-        {
-            input.close();
-        }
-    }
-
-    public DataInputBuffer next(final String clientKey, String cfName, 
SortedSet<byte[]> columnNames) throws IOException
-    {
-        IFileReader dataReader = null;
-        try
-        {
-            dataReader = SequenceFile.reader(dataFile);
-            String decoratedKey = partitioner.decorateKey(clientKey);
-            long position = getPosition(decoratedKey, partitioner);
-
-            DataOutputBuffer bufOut = new DataOutputBuffer();
-            DataInputBuffer bufIn = new DataInputBuffer();
-            long bytesRead = dataReader.next(decoratedKey, bufOut, cfName, 
columnNames, position);
-            if (bytesRead != -1L)
-            {
-                if (bufOut.getLength() > 0)
-                {
-                    bufIn.reset(bufOut.getData(), bufOut.getLength());
-                    /* read the key even though we do not use it */
-                    bufIn.readUTF();
-                    bufIn.readInt();
-                }
-            }
-            return bufIn;
-        }
-        finally
-        {
-            if (dataReader != null)
-            {
-                dataReader.close();
-            }
-        }
-    }
-
-    /**
-     * obtain a BlockReader for the getColumnSlice call.
-     */
-    public ColumnGroupReader getColumnGroupReader(String key, String cfName, 
byte[] startColumn, boolean isAscending) throws IOException
-    {
-        IFileReader dataReader = SequenceFile.reader(dataFile);
-
-        try
-        {
-            /* Morph key into actual key based on the partition type. */
-            String decoratedKey = partitioner.decorateKey(key);
-            long position = getPosition(decoratedKey, partitioner);
-            AbstractType comparator = 
DatabaseDescriptor.getComparator(getTableName(), cfName);
-            return new ColumnGroupReader(dataFile, decoratedKey, cfName, 
comparator, startColumn, isAscending, position);
-        }
-        finally
-        {
-            dataReader.close();
-        }
-    }
-
-    public void delete() throws IOException
-    {
-        FileUtils.deleteWithConfirm(new File(dataFile));
-        FileUtils.deleteWithConfirm(new File(indexFilename(dataFile)));
-        FileUtils.deleteWithConfirm(new File(filterFilename(dataFile)));
-        openedFiles.remove(dataFile);
-    }
-
-    /** obviously only for testing */
-    public void forceBloomFilterFailures()
-    {
-        bf = BloomFilter.alwaysMatchingBloomFilter();
-    }
-
-    static void reopenUnsafe() throws IOException // testing only
-    {
-        Collection<SSTableReader> sstables = new 
ArrayList<SSTableReader>(openedFiles.values());
-        openedFiles.clear();
-        for (SSTableReader sstable : sstables)
-        {
-            SSTableReader.open(sstable.dataFile, sstable.partitioner, 0.01);
-        }
-    }
-
-    IPartitioner getPartitioner()
-    {
-        return partitioner;
-    }
-
-    public FileStruct getFileStruct() throws IOException
-    {
-        return new FileStruct(this);
-    }
-
-    public String getTableName()
-    {
-        return parseTableName(dataFile);
-    }
-
-    public static void deleteAll() throws IOException
-    {
-        for (SSTableReader sstable : openedFiles.values())
-        {
-            sstable.delete();
-        }
-    }
-}
-
-class FileSSTableMap
-{
-    private final Map<String, SSTableReader> map = new 
NonBlockingHashMap<String, SSTableReader>();
-
-    public SSTableReader get(String filename)
-    {
-        try
-        {
-            return map.get(new File(filename).getCanonicalPath());
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public SSTableReader put(String filename, SSTableReader value)
-    {
-        try
-        {
-            return map.put(new File(filename).getCanonicalPath(), value);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public Collection<SSTableReader> values()
-    {
-        return map.values();
-    }
-
-    public void clear()
-    {
-        map.clear();
-    }
-
-    public void remove(String filename) throws IOException
-    {
-        map.remove(new File(filename).getCanonicalPath());
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.SequenceFile.ColumnGroupReader;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import 
com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
+
+public class SSTableReader extends SSTable
+{
+    private static Logger logger = Logger.getLogger(SSTableReader.class);
+
+    private static FileSSTableMap openedFiles = new FileSSTableMap();
+
+    public static int indexInterval()
+    {
+        return INDEX_INTERVAL;
+    }
+
+    // todo can we refactor to take list of sstables?
+    public static int getApproximateKeyCount(List<String> dataFiles)
+    {
+        int count = 0;
+
+        for (String dataFileName : dataFiles)
+        {
+            SSTableReader sstable = openedFiles.get(dataFileName);
+            assert sstable != null;
+            int indexKeyCount = sstable.getIndexPositions().size();
+            count = count + (indexKeyCount + 1) * INDEX_INTERVAL;
+            if (logger.isDebugEnabled())
+                logger.debug("index size for bloom filter calc for file  : " + 
dataFileName + "   : " + count);
+        }
+
+        return count;
+    }
+
+    /**
+     * Get all indexed keys in the SSTable.
+     */
+    public static List<String> getIndexedKeys()
+    {
+        List<String> indexedKeys = new ArrayList<String>();
+
+        for (SSTableReader sstable : openedFiles.values())
+        {
+            for (KeyPosition kp : sstable.getIndexPositions())
+            {
+                indexedKeys.add(kp.key);
+            }
+        }
+        Collections.sort(indexedKeys);
+
+        return indexedKeys;
+    }
+
+    public static synchronized SSTableReader open(String dataFileName) throws 
IOException
+    {
+        return open(dataFileName, StorageService.getPartitioner(), 
DatabaseDescriptor.getKeysCachedFraction(parseTableName(dataFileName)));
+    }
+
+    public static synchronized SSTableReader open(String dataFileName, 
IPartitioner partitioner, double cacheFraction) throws IOException
+    {
+        SSTableReader sstable = openedFiles.get(dataFileName);
+        if (sstable == null)
+        {
+            assert partitioner != null;
+            sstable = new SSTableReader(dataFileName, partitioner);
+
+            long start = System.currentTimeMillis();
+            sstable.loadIndexFile();
+            sstable.loadBloomFilter();
+            if (cacheFraction > 0)
+            {
+                sstable.keyCache = 
createKeyCache((int)((sstable.getIndexPositions().size() + 1) * INDEX_INTERVAL 
* cacheFraction));
+            }
+            if (logger.isDebugEnabled())
+                logger.debug("INDEX LOAD TIME for "  + dataFileName + ": " + 
(System.currentTimeMillis() - start) + " ms.");
+
+            openedFiles.put(dataFileName, sstable);
+        }
+        return sstable;
+    }
+
+    public static synchronized SSTableReader get(String dataFileName) throws 
IOException
+    {
+        SSTableReader sstable = openedFiles.get(dataFileName);
+        assert sstable != null;
+        return sstable;
+    }
+
+    public static ConcurrentLinkedHashMap<String, Long> createKeyCache(int 
size)
+    {
+        return 
ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.SECOND_CHANCE,
 size);
+    }
+
+
+    private ConcurrentLinkedHashMap<String, Long> keyCache;
+
+    SSTableReader(String filename, IPartitioner partitioner, List<KeyPosition> 
indexPositions, BloomFilter bloomFilter, ConcurrentLinkedHashMap<String, Long> 
keyCache)
+    {
+        super(filename, partitioner);
+        this.indexPositions = indexPositions;
+        this.bf = bloomFilter;
+        this.keyCache = keyCache;
+        synchronized (SSTableReader.this)
+        {
+            openedFiles.put(filename, this);
+        }
+    }
+
+    private SSTableReader(String filename, IPartitioner partitioner)
+    {
+        super(filename, partitioner);
+    }
+
+    public List<KeyPosition> getIndexPositions()
+    {
+        return indexPositions;
+    }
+
+    private void loadBloomFilter() throws IOException
+    {
+        DataInputStream stream = new DataInputStream(new 
FileInputStream(filterFilename()));
+        bf = BloomFilter.serializer().deserialize(stream);
+    }
+
+    private void loadIndexFile() throws IOException
+    {
+        BufferedRandomAccessFile input = new 
BufferedRandomAccessFile(indexFilename(), "r");
+        indexPositions = new ArrayList<KeyPosition>();
+
+        int i = 0;
+        long indexSize = input.length();
+        while (true)
+        {
+            long indexPosition = input.getFilePointer();
+            if (indexPosition == indexSize)
+            {
+                break;
+            }
+            String decoratedKey = input.readUTF();
+            input.readLong();
+            if (i++ % INDEX_INTERVAL == 0)
+            {
+                indexPositions.add(new KeyPosition(decoratedKey, 
indexPosition));
+            }
+        }
+    }
+
+    /** get the position in the index file to start scanning to find the given 
key (at most indexInterval keys away) */
+    private long getIndexScanPosition(String decoratedKey, IPartitioner 
partitioner)
+    {
+        assert indexPositions != null && indexPositions.size() > 0;
+        int index = Collections.binarySearch(indexPositions, new 
KeyPosition(decoratedKey, -1));
+        if (index < 0)
+        {
+            // binary search gives us the first index _greater_ than the key 
searched for,
+            // i.e., its insertion position
+            int greaterThan = (index + 1) * -1;
+            if (greaterThan == 0)
+                return -1;
+            return indexPositions.get(greaterThan - 1).position;
+        }
+        else
+        {
+            return indexPositions.get(index).position;
+        }
+    }
+
+    /**
+     * returns the position in the data file to find the given key, or -1 if 
the key is not present
+     */
+    public long getPosition(String decoratedKey, IPartitioner partitioner) 
throws IOException
+    {
+        if (!bf.isPresent(decoratedKey))
+            return -1;
+        if (keyCache != null)
+        {
+            Long cachedPosition = keyCache.get(decoratedKey);
+            if (cachedPosition != null)
+            {
+                return cachedPosition;
+            }
+        }
+        long start = getIndexScanPosition(decoratedKey, partitioner);
+        if (start < 0)
+        {
+            return -1;
+        }
+
+        // TODO mmap the index file?
+        BufferedRandomAccessFile input = new 
BufferedRandomAccessFile(indexFilename(dataFile), "r");
+        input.seek(start);
+        int i = 0;
+        try
+        {
+            do
+            {
+                String indexDecoratedKey;
+                try
+                {
+                    indexDecoratedKey = input.readUTF();
+                }
+                catch (EOFException e)
+                {
+                    return -1;
+                }
+                long position = input.readLong();
+                int v = 
partitioner.getDecoratedKeyComparator().compare(indexDecoratedKey, 
decoratedKey);
+                if (v == 0)
+                {
+                    if (keyCache != null)
+                        keyCache.put(decoratedKey, position);
+                    return position;
+                }
+                if (v > 0)
+                    return -1;
+            } while  (++i < INDEX_INTERVAL);
+        }
+        finally
+        {
+            input.close();
+        }
+        return -1;
+    }
+
+    /** like getPosition, but if key is not found will return the location of 
the first key _greater_ than the desired one, or -1 if no such key exists. */
+    public long getNearestPosition(String decoratedKey) throws IOException
+    {
+        long start = getIndexScanPosition(decoratedKey, partitioner);
+        if (start < 0)
+        {
+            return 0;
+        }
+        BufferedRandomAccessFile input = new 
BufferedRandomAccessFile(indexFilename(dataFile), "r");
+        input.seek(start);
+        try
+        {
+            while (true)
+            {
+                String indexDecoratedKey;
+                try
+                {
+                    indexDecoratedKey = input.readUTF();
+                }
+                catch (EOFException e)
+                {
+                    return -1;
+                }
+                long position = input.readLong();
+                int v = 
partitioner.getDecoratedKeyComparator().compare(indexDecoratedKey, 
decoratedKey);
+                if (v >= 0)
+                    return position;
+            }
+        }
+        finally
+        {
+            input.close();
+        }
+    }
+
+    public DataInputBuffer next(final String clientKey, String cfName, 
SortedSet<byte[]> columnNames) throws IOException
+    {
+        IFileReader dataReader = null;
+        try
+        {
+            dataReader = SequenceFile.reader(dataFile);
+            String decoratedKey = partitioner.decorateKey(clientKey);
+            long position = getPosition(decoratedKey, partitioner);
+
+            DataOutputBuffer bufOut = new DataOutputBuffer();
+            DataInputBuffer bufIn = new DataInputBuffer();
+            long bytesRead = dataReader.next(decoratedKey, bufOut, cfName, 
columnNames, position);
+            if (bytesRead != -1L)
+            {
+                if (bufOut.getLength() > 0)
+                {
+                    bufIn.reset(bufOut.getData(), bufOut.getLength());
+                    /* read the key even though we do not use it */
+                    bufIn.readUTF();
+                    bufIn.readInt();
+                }
+            }
+            return bufIn;
+        }
+        finally
+        {
+            if (dataReader != null)
+            {
+                dataReader.close();
+            }
+        }
+    }
+
+    /**
+     * obtain a BlockReader for the getColumnSlice call.
+     */
+    public ColumnGroupReader getColumnGroupReader(String key, String cfName, 
byte[] startColumn, boolean isAscending) throws IOException
+    {
+        IFileReader dataReader = SequenceFile.reader(dataFile);
+
+        try
+        {
+            /* Morph key into actual key based on the partition type. */
+            String decoratedKey = partitioner.decorateKey(key);
+            long position = getPosition(decoratedKey, partitioner);
+            AbstractType comparator = 
DatabaseDescriptor.getComparator(getTableName(), cfName);
+            return new ColumnGroupReader(dataFile, decoratedKey, cfName, 
comparator, startColumn, isAscending, position);
+        }
+        finally
+        {
+            dataReader.close();
+        }
+    }
+
+    public void delete() throws IOException
+    {
+        FileUtils.deleteWithConfirm(new File(dataFile));
+        FileUtils.deleteWithConfirm(new File(indexFilename(dataFile)));
+        FileUtils.deleteWithConfirm(new File(filterFilename(dataFile)));
+        openedFiles.remove(dataFile);
+    }
+
+    /** obviously only for testing */
+    public void forceBloomFilterFailures()
+    {
+        bf = BloomFilter.alwaysMatchingBloomFilter();
+    }
+
+    static void reopenUnsafe() throws IOException // testing only
+    {
+        Collection<SSTableReader> sstables = new 
ArrayList<SSTableReader>(openedFiles.values());
+        openedFiles.clear();
+        for (SSTableReader sstable : sstables)
+        {
+            SSTableReader.open(sstable.dataFile, sstable.partitioner, 0.01);
+        }
+    }
+
+    IPartitioner getPartitioner()
+    {
+        return partitioner;
+    }
+
+    public FileStruct getFileStruct() throws IOException
+    {
+        return new FileStruct(this);
+    }
+
+    public String getTableName()
+    {
+        return parseTableName(dataFile);
+    }
+
+    public static void deleteAll() throws IOException
+    {
+        for (SSTableReader sstable : openedFiles.values())
+        {
+            sstable.delete();
+        }
+    }
+}
+
+class FileSSTableMap
+{
+    private final Map<String, SSTableReader> map = new 
NonBlockingHashMap<String, SSTableReader>();
+
+    public SSTableReader get(String filename)
+    {
+        try
+        {
+            return map.get(new File(filename).getCanonicalPath());
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public SSTableReader put(String filename, SSTableReader value)
+    {
+        try
+        {
+            return map.put(new File(filename).getCanonicalPath(), value);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Collection<SSTableReader> values()
+    {
+        return map.values();
+    }
+
+    public void clear()
+    {
+        map.clear();
+    }
+
+    public void remove(String filename) throws IOException
+    {
+        map.remove(new File(filename).getCanonicalPath());
+    }
+}


Reply via email to