Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=799331&r1=799330&r2=799331&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java Thu Jul 30 15:30:21 2009 @@ -1,355 +1,355 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.io; - -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.*; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ColumnSerializer; -import org.apache.cassandra.db.marshal.AbstractType; - - -/** - * Provides helper to serialize, deserialize and use column indexes. - * Author : Karthik Ranganathan ( [email protected] ) - */ - -public class IndexHelper -{ - /** - * Serializes a column index to a data output stream - * @param indexSizeInBytes Size of index to be written - * @param columnIndexList List of column index entries as objects - * @param dos the output stream into which the column index is to be written - * @throws IOException - */ - public static void serialize(int indexSizeInBytes, List<ColumnIndexInfo> columnIndexList, DataOutputStream dos) throws IOException - { - /* if we have no data to index, the write that there is no index present */ - if(indexSizeInBytes == 0 || columnIndexList == null || columnIndexList.size() == 0) - { - dos.writeBoolean(false); - } - else - { - /* write if we are storing a column index */ - dos.writeBoolean(true); - /* write the size of the index */ - dos.writeInt(indexSizeInBytes); - for( ColumnIndexInfo cIndexInfo : columnIndexList ) - { - cIndexInfo.serialize(dos); - } - } - } - - /** - * Skip the bloom filter and the index and return the bytes read. - * @param in the data input from which the bloom filter and index - * should be skipped - * @return number of bytes read. - * @throws IOException - */ - public static int skipBloomFilterAndIndex(DataInput in) throws IOException - { - int totalBytesRead = 0; - /* size of the bloom filter */ - int size = in.readInt(); - totalBytesRead += 4; - /* skip the serialized bloom filter */ - in.skipBytes(size); - totalBytesRead += size; - /* skip the index on disk */ - /* read if the file has column indexes */ - boolean hasColumnIndexes = in.readBoolean(); - totalBytesRead += 1; - if ( hasColumnIndexes ) - { - totalBytesRead += skipIndex(in); - } - return totalBytesRead; - } - - /** - * Skip the bloom filter and return the bytes read. - * @param in the data input from which the bloom filter - * should be skipped - * @return number of bytes read. - * @throws IOException - */ - public static int skipBloomFilter(DataInput in) throws IOException - { - int totalBytesRead = 0; - /* size of the bloom filter */ - int size = in.readInt(); - totalBytesRead += 4; - /* skip the serialized bloom filter */ - in.skipBytes(size); - totalBytesRead += size; - return totalBytesRead; - } - - /** - * Skip the index and return the number of bytes read. - * @param file the data input from which the index should be skipped - * @return number of bytes read from the data input - * @throws IOException - */ - public static int skipIndex(DataInput file) throws IOException - { - /* read only the column index list */ - int columnIndexSize = file.readInt(); - int totalBytesRead = 4; - - /* skip the column index data */ - file.skipBytes(columnIndexSize); - totalBytesRead += columnIndexSize; - - return totalBytesRead; - } - - /** - * Deserialize the index into a structure and return the number of bytes read. - * @param tableName - *...@param in Input from which the serialized form of the index is read - * @param columnIndexList the structure which is filled in with the deserialized index @return number of bytes read from the input - * @throws IOException - */ - static int deserializeIndex(String tableName, String cfName, DataInput in, List<ColumnIndexInfo> columnIndexList) throws IOException - { - /* read only the column index list */ - int columnIndexSize = in.readInt(); - int totalBytesRead = 4; - - /* read the indexes into a separate buffer */ - DataOutputBuffer indexOut = new DataOutputBuffer(); - /* write the data into buffer */ - indexOut.write(in, columnIndexSize); - totalBytesRead += columnIndexSize; - - /* now deserialize the index list */ - DataInputBuffer indexIn = new DataInputBuffer(); - indexIn.reset(indexOut.getData(), indexOut.getLength()); - - AbstractType comparator = DatabaseDescriptor.getComparator(tableName, cfName); - - while (indexIn.available() > 0) - { - // TODO this is all kinds of messed up - ColumnIndexInfo cIndexInfo = new ColumnIndexInfo(comparator); - cIndexInfo = cIndexInfo.deserialize(indexIn); - columnIndexList.add(cIndexInfo); - } - - return totalBytesRead; - } - - /** - * Returns the range in which a given column falls in the index - * @param columnIndexList the in-memory representation of the column index - * @param dataSize the total size of the data - * @param totalNumCols total number of columns - * @return an object describing a subrange in which the column is serialized - */ - static ColumnRange getColumnRangeFromNameIndex(IndexHelper.ColumnIndexInfo cIndexInfo, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols) - { - /* find the offset for the column */ - int size = columnIndexList.size(); - long start = 0; - long end = dataSize; - int numColumns = 0; - - int index = Collections.binarySearch(columnIndexList, cIndexInfo); - if ( index < 0 ) - { - /* We are here which means that the requested column is not an index. */ - index = (++index)*(-1); - } - else - { - ++index; - } - - /* calculate the starting offset from which we have to read */ - start = (index == 0) ? 0 : columnIndexList.get(index - 1).position(); - - if( index < size ) - { - end = columnIndexList.get(index).position(); - numColumns = columnIndexList.get(index).count(); - } - else - { - end = dataSize; - int totalColsIndexed = 0; - for( IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList ) - { - totalColsIndexed += colPosInfo.count(); - } - numColumns = totalNumCols - totalColsIndexed; - } - - return new ColumnRange(start, end, numColumns); - } - - /** - * Returns the sub-ranges that contain the list of columns in columnNames. - * @param columnNames The list of columns whose subranges need to be found - * @param columnIndexList the deserialized column indexes - * @param dataSize the total size of data - * @param totalNumCols the total number of columns - * @return a list of subranges which contain all the columns in columnNames - */ - static List<ColumnRange> getMultiColumnRangesFromNameIndex(SortedSet<byte[]> columnNames, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols) - { - List<ColumnRange> columnRanges = new ArrayList<ColumnRange>(); - - if (columnIndexList.size() == 0) - { - columnRanges.add(new ColumnRange(0, dataSize, totalNumCols)); - } - else - { - Map<Long, Boolean> offset = new HashMap<Long, Boolean>(); - for (byte[] name : columnNames) - { - IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnIndexInfo(name, 0, 0, (AbstractType)columnNames.comparator()); - ColumnRange columnRange = getColumnRangeFromNameIndex(cIndexInfo, columnIndexList, dataSize, totalNumCols); - if (offset.get(columnRange.coordinate().start_) == null) - { - columnRanges.add(columnRange); - offset.put(columnRange.coordinate().start_, true); - } - } - } - - return columnRanges; - } - - - /** - * A column range containing the start and end - * offset of the appropriate column index chunk - * and the number of columns in that chunk. - * @author alakshman - * - */ - public static class ColumnRange - { - private Coordinate coordinate_; - private int columnCount_; - - ColumnRange(long start, long end, int columnCount) - { - coordinate_ = new Coordinate(start, end); - columnCount_ = columnCount; - } - - Coordinate coordinate() - { - return coordinate_; - } - - int count() - { - return columnCount_; - } - } - - /** - * A helper class to generate indexes while - * the columns are sorted by name on disk. - */ - public static class ColumnIndexInfo implements Comparable<ColumnIndexInfo> - { - private long position_; - private int columnCount_; - private byte[] name_; - private AbstractType comparator_; - - public ColumnIndexInfo(AbstractType comparator_) - { - this.comparator_ = comparator_; - } - - public ColumnIndexInfo(byte[] name, long position, int columnCount, AbstractType comparator) - { - this(comparator); - assert name.length == 0 || !"".equals(comparator.getString(name)); // Todo r/m length == 0 hack - name_ = name; - position_ = position; - columnCount_ = columnCount; - } - - public long position() - { - return position_; - } - - public void position(long position) - { - position_ = position; - } - - int count() - { - return columnCount_; - } - - public void count(int count) - { - columnCount_ = count; - } - - public int compareTo(ColumnIndexInfo rhs) - { - return comparator_.compare(name_, rhs.name_); - } - - public void serialize(DataOutputStream dos) throws IOException - { - dos.writeLong(position()); - dos.writeInt(count()); - ColumnSerializer.writeName(name_, dos); - } - - public ColumnIndexInfo deserialize(DataInputStream dis) throws IOException - { - long position = dis.readLong(); - int columnCount = dis.readInt(); - byte[] name = ColumnSerializer.readName(dis); - return new ColumnIndexInfo(name, position, columnCount, comparator_); - } - - public int size() - { - // serialized size -- CS.writeName includes a 2-byte length prefix - return 8 + 4 + 2 + name_.length; - } - - public byte[] name() - { - return name_; - } - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.*; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnSerializer; +import org.apache.cassandra.db.marshal.AbstractType; + + +/** + * Provides helper to serialize, deserialize and use column indexes. + * Author : Karthik Ranganathan ( [email protected] ) + */ + +public class IndexHelper +{ + /** + * Serializes a column index to a data output stream + * @param indexSizeInBytes Size of index to be written + * @param columnIndexList List of column index entries as objects + * @param dos the output stream into which the column index is to be written + * @throws IOException + */ + public static void serialize(int indexSizeInBytes, List<ColumnIndexInfo> columnIndexList, DataOutputStream dos) throws IOException + { + /* if we have no data to index, the write that there is no index present */ + if(indexSizeInBytes == 0 || columnIndexList == null || columnIndexList.size() == 0) + { + dos.writeBoolean(false); + } + else + { + /* write if we are storing a column index */ + dos.writeBoolean(true); + /* write the size of the index */ + dos.writeInt(indexSizeInBytes); + for( ColumnIndexInfo cIndexInfo : columnIndexList ) + { + cIndexInfo.serialize(dos); + } + } + } + + /** + * Skip the bloom filter and the index and return the bytes read. + * @param in the data input from which the bloom filter and index + * should be skipped + * @return number of bytes read. + * @throws IOException + */ + public static int skipBloomFilterAndIndex(DataInput in) throws IOException + { + int totalBytesRead = 0; + /* size of the bloom filter */ + int size = in.readInt(); + totalBytesRead += 4; + /* skip the serialized bloom filter */ + in.skipBytes(size); + totalBytesRead += size; + /* skip the index on disk */ + /* read if the file has column indexes */ + boolean hasColumnIndexes = in.readBoolean(); + totalBytesRead += 1; + if ( hasColumnIndexes ) + { + totalBytesRead += skipIndex(in); + } + return totalBytesRead; + } + + /** + * Skip the bloom filter and return the bytes read. + * @param in the data input from which the bloom filter + * should be skipped + * @return number of bytes read. + * @throws IOException + */ + public static int skipBloomFilter(DataInput in) throws IOException + { + int totalBytesRead = 0; + /* size of the bloom filter */ + int size = in.readInt(); + totalBytesRead += 4; + /* skip the serialized bloom filter */ + in.skipBytes(size); + totalBytesRead += size; + return totalBytesRead; + } + + /** + * Skip the index and return the number of bytes read. + * @param file the data input from which the index should be skipped + * @return number of bytes read from the data input + * @throws IOException + */ + public static int skipIndex(DataInput file) throws IOException + { + /* read only the column index list */ + int columnIndexSize = file.readInt(); + int totalBytesRead = 4; + + /* skip the column index data */ + file.skipBytes(columnIndexSize); + totalBytesRead += columnIndexSize; + + return totalBytesRead; + } + + /** + * Deserialize the index into a structure and return the number of bytes read. + * @param tableName + *...@param in Input from which the serialized form of the index is read + * @param columnIndexList the structure which is filled in with the deserialized index @return number of bytes read from the input + * @throws IOException + */ + static int deserializeIndex(String tableName, String cfName, DataInput in, List<ColumnIndexInfo> columnIndexList) throws IOException + { + /* read only the column index list */ + int columnIndexSize = in.readInt(); + int totalBytesRead = 4; + + /* read the indexes into a separate buffer */ + DataOutputBuffer indexOut = new DataOutputBuffer(); + /* write the data into buffer */ + indexOut.write(in, columnIndexSize); + totalBytesRead += columnIndexSize; + + /* now deserialize the index list */ + DataInputBuffer indexIn = new DataInputBuffer(); + indexIn.reset(indexOut.getData(), indexOut.getLength()); + + AbstractType comparator = DatabaseDescriptor.getComparator(tableName, cfName); + + while (indexIn.available() > 0) + { + // TODO this is all kinds of messed up + ColumnIndexInfo cIndexInfo = new ColumnIndexInfo(comparator); + cIndexInfo = cIndexInfo.deserialize(indexIn); + columnIndexList.add(cIndexInfo); + } + + return totalBytesRead; + } + + /** + * Returns the range in which a given column falls in the index + * @param columnIndexList the in-memory representation of the column index + * @param dataSize the total size of the data + * @param totalNumCols total number of columns + * @return an object describing a subrange in which the column is serialized + */ + static ColumnRange getColumnRangeFromNameIndex(IndexHelper.ColumnIndexInfo cIndexInfo, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols) + { + /* find the offset for the column */ + int size = columnIndexList.size(); + long start = 0; + long end = dataSize; + int numColumns = 0; + + int index = Collections.binarySearch(columnIndexList, cIndexInfo); + if ( index < 0 ) + { + /* We are here which means that the requested column is not an index. */ + index = (++index)*(-1); + } + else + { + ++index; + } + + /* calculate the starting offset from which we have to read */ + start = (index == 0) ? 0 : columnIndexList.get(index - 1).position(); + + if( index < size ) + { + end = columnIndexList.get(index).position(); + numColumns = columnIndexList.get(index).count(); + } + else + { + end = dataSize; + int totalColsIndexed = 0; + for( IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList ) + { + totalColsIndexed += colPosInfo.count(); + } + numColumns = totalNumCols - totalColsIndexed; + } + + return new ColumnRange(start, end, numColumns); + } + + /** + * Returns the sub-ranges that contain the list of columns in columnNames. + * @param columnNames The list of columns whose subranges need to be found + * @param columnIndexList the deserialized column indexes + * @param dataSize the total size of data + * @param totalNumCols the total number of columns + * @return a list of subranges which contain all the columns in columnNames + */ + static List<ColumnRange> getMultiColumnRangesFromNameIndex(SortedSet<byte[]> columnNames, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols) + { + List<ColumnRange> columnRanges = new ArrayList<ColumnRange>(); + + if (columnIndexList.size() == 0) + { + columnRanges.add(new ColumnRange(0, dataSize, totalNumCols)); + } + else + { + Map<Long, Boolean> offset = new HashMap<Long, Boolean>(); + for (byte[] name : columnNames) + { + IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnIndexInfo(name, 0, 0, (AbstractType)columnNames.comparator()); + ColumnRange columnRange = getColumnRangeFromNameIndex(cIndexInfo, columnIndexList, dataSize, totalNumCols); + if (offset.get(columnRange.coordinate().start_) == null) + { + columnRanges.add(columnRange); + offset.put(columnRange.coordinate().start_, true); + } + } + } + + return columnRanges; + } + + + /** + * A column range containing the start and end + * offset of the appropriate column index chunk + * and the number of columns in that chunk. + * @author alakshman + * + */ + public static class ColumnRange + { + private Coordinate coordinate_; + private int columnCount_; + + ColumnRange(long start, long end, int columnCount) + { + coordinate_ = new Coordinate(start, end); + columnCount_ = columnCount; + } + + Coordinate coordinate() + { + return coordinate_; + } + + int count() + { + return columnCount_; + } + } + + /** + * A helper class to generate indexes while + * the columns are sorted by name on disk. + */ + public static class ColumnIndexInfo implements Comparable<ColumnIndexInfo> + { + private long position_; + private int columnCount_; + private byte[] name_; + private AbstractType comparator_; + + public ColumnIndexInfo(AbstractType comparator_) + { + this.comparator_ = comparator_; + } + + public ColumnIndexInfo(byte[] name, long position, int columnCount, AbstractType comparator) + { + this(comparator); + assert name.length == 0 || !"".equals(comparator.getString(name)); // Todo r/m length == 0 hack + name_ = name; + position_ = position; + columnCount_ = columnCount; + } + + public long position() + { + return position_; + } + + public void position(long position) + { + position_ = position; + } + + int count() + { + return columnCount_; + } + + public void count(int count) + { + columnCount_ = count; + } + + public int compareTo(ColumnIndexInfo rhs) + { + return comparator_.compare(name_, rhs.name_); + } + + public void serialize(DataOutputStream dos) throws IOException + { + dos.writeLong(position()); + dos.writeInt(count()); + ColumnSerializer.writeName(name_, dos); + } + + public ColumnIndexInfo deserialize(DataInputStream dis) throws IOException + { + long position = dis.readLong(); + int columnCount = dis.readInt(); + byte[] name = ColumnSerializer.readName(dis); + return new ColumnIndexInfo(name, position, columnCount, comparator_); + } + + public int size() + { + // serialized size -- CS.writeName includes a 2-byte length prefix + return 8 + 4 + 2 + name_.length; + } + + public byte[] name() + { + return name_; + } + } +}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=799331&r1=799330&r2=799331&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Thu Jul 30 15:30:21 2009 @@ -69,7 +69,7 @@ static String parseTableName(String filename) { - return new File(filename).getParentFile().getName(); + return new File(filename).getParentFile().getName(); } /** Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=799331&r1=799330&r2=799331&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Thu Jul 30 15:30:21 2009 @@ -1,434 +1,434 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.io; - -import java.io.*; -import java.util.*; - -import org.apache.log4j.Logger; - -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.io.SequenceFile.ColumnGroupReader; -import org.apache.cassandra.utils.BloomFilter; -import org.apache.cassandra.utils.FileUtils; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.cliffc.high_scale_lib.NonBlockingHashMap; -import com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap; - -public class SSTableReader extends SSTable -{ - private static Logger logger = Logger.getLogger(SSTableReader.class); - - private static FileSSTableMap openedFiles = new FileSSTableMap(); - - public static int indexInterval() - { - return INDEX_INTERVAL; - } - - // todo can we refactor to take list of sstables? - public static int getApproximateKeyCount(List<String> dataFiles) - { - int count = 0; - - for (String dataFileName : dataFiles) - { - SSTableReader sstable = openedFiles.get(dataFileName); - assert sstable != null; - int indexKeyCount = sstable.getIndexPositions().size(); - count = count + (indexKeyCount + 1) * INDEX_INTERVAL; - if (logger.isDebugEnabled()) - logger.debug("index size for bloom filter calc for file : " + dataFileName + " : " + count); - } - - return count; - } - - /** - * Get all indexed keys in the SSTable. - */ - public static List<String> getIndexedKeys() - { - List<String> indexedKeys = new ArrayList<String>(); - - for (SSTableReader sstable : openedFiles.values()) - { - for (KeyPosition kp : sstable.getIndexPositions()) - { - indexedKeys.add(kp.key); - } - } - Collections.sort(indexedKeys); - - return indexedKeys; - } - - public static synchronized SSTableReader open(String dataFileName) throws IOException - { - return open(dataFileName, StorageService.getPartitioner(), DatabaseDescriptor.getKeysCachedFraction(parseTableName(dataFileName))); - } - - public static synchronized SSTableReader open(String dataFileName, IPartitioner partitioner, double cacheFraction) throws IOException - { - SSTableReader sstable = openedFiles.get(dataFileName); - if (sstable == null) - { - assert partitioner != null; - sstable = new SSTableReader(dataFileName, partitioner); - - long start = System.currentTimeMillis(); - sstable.loadIndexFile(); - sstable.loadBloomFilter(); - if (cacheFraction > 0) - { - sstable.keyCache = createKeyCache((int)((sstable.getIndexPositions().size() + 1) * INDEX_INTERVAL * cacheFraction)); - } - if (logger.isDebugEnabled()) - logger.debug("INDEX LOAD TIME for " + dataFileName + ": " + (System.currentTimeMillis() - start) + " ms."); - - openedFiles.put(dataFileName, sstable); - } - return sstable; - } - - public static synchronized SSTableReader get(String dataFileName) throws IOException - { - SSTableReader sstable = openedFiles.get(dataFileName); - assert sstable != null; - return sstable; - } - - public static ConcurrentLinkedHashMap<String, Long> createKeyCache(int size) - { - return ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.SECOND_CHANCE, size); - } - - - private ConcurrentLinkedHashMap<String, Long> keyCache; - - SSTableReader(String filename, IPartitioner partitioner, List<KeyPosition> indexPositions, BloomFilter bloomFilter, ConcurrentLinkedHashMap<String, Long> keyCache) - { - super(filename, partitioner); - this.indexPositions = indexPositions; - this.bf = bloomFilter; - this.keyCache = keyCache; - synchronized (SSTableReader.this) - { - openedFiles.put(filename, this); - } - } - - private SSTableReader(String filename, IPartitioner partitioner) - { - super(filename, partitioner); - } - - public List<KeyPosition> getIndexPositions() - { - return indexPositions; - } - - private void loadBloomFilter() throws IOException - { - DataInputStream stream = new DataInputStream(new FileInputStream(filterFilename())); - bf = BloomFilter.serializer().deserialize(stream); - } - - private void loadIndexFile() throws IOException - { - BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r"); - indexPositions = new ArrayList<KeyPosition>(); - - int i = 0; - long indexSize = input.length(); - while (true) - { - long indexPosition = input.getFilePointer(); - if (indexPosition == indexSize) - { - break; - } - String decoratedKey = input.readUTF(); - input.readLong(); - if (i++ % INDEX_INTERVAL == 0) - { - indexPositions.add(new KeyPosition(decoratedKey, indexPosition)); - } - } - } - - /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */ - private long getIndexScanPosition(String decoratedKey, IPartitioner partitioner) - { - assert indexPositions != null && indexPositions.size() > 0; - int index = Collections.binarySearch(indexPositions, new KeyPosition(decoratedKey, -1)); - if (index < 0) - { - // binary search gives us the first index _greater_ than the key searched for, - // i.e., its insertion position - int greaterThan = (index + 1) * -1; - if (greaterThan == 0) - return -1; - return indexPositions.get(greaterThan - 1).position; - } - else - { - return indexPositions.get(index).position; - } - } - - /** - * returns the position in the data file to find the given key, or -1 if the key is not present - */ - public long getPosition(String decoratedKey, IPartitioner partitioner) throws IOException - { - if (!bf.isPresent(decoratedKey)) - return -1; - if (keyCache != null) - { - Long cachedPosition = keyCache.get(decoratedKey); - if (cachedPosition != null) - { - return cachedPosition; - } - } - long start = getIndexScanPosition(decoratedKey, partitioner); - if (start < 0) - { - return -1; - } - - // TODO mmap the index file? - BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile), "r"); - input.seek(start); - int i = 0; - try - { - do - { - String indexDecoratedKey; - try - { - indexDecoratedKey = input.readUTF(); - } - catch (EOFException e) - { - return -1; - } - long position = input.readLong(); - int v = partitioner.getDecoratedKeyComparator().compare(indexDecoratedKey, decoratedKey); - if (v == 0) - { - if (keyCache != null) - keyCache.put(decoratedKey, position); - return position; - } - if (v > 0) - return -1; - } while (++i < INDEX_INTERVAL); - } - finally - { - input.close(); - } - return -1; - } - - /** like getPosition, but if key is not found will return the location of the first key _greater_ than the desired one, or -1 if no such key exists. */ - public long getNearestPosition(String decoratedKey) throws IOException - { - long start = getIndexScanPosition(decoratedKey, partitioner); - if (start < 0) - { - return 0; - } - BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile), "r"); - input.seek(start); - try - { - while (true) - { - String indexDecoratedKey; - try - { - indexDecoratedKey = input.readUTF(); - } - catch (EOFException e) - { - return -1; - } - long position = input.readLong(); - int v = partitioner.getDecoratedKeyComparator().compare(indexDecoratedKey, decoratedKey); - if (v >= 0) - return position; - } - } - finally - { - input.close(); - } - } - - public DataInputBuffer next(final String clientKey, String cfName, SortedSet<byte[]> columnNames) throws IOException - { - IFileReader dataReader = null; - try - { - dataReader = SequenceFile.reader(dataFile); - String decoratedKey = partitioner.decorateKey(clientKey); - long position = getPosition(decoratedKey, partitioner); - - DataOutputBuffer bufOut = new DataOutputBuffer(); - DataInputBuffer bufIn = new DataInputBuffer(); - long bytesRead = dataReader.next(decoratedKey, bufOut, cfName, columnNames, position); - if (bytesRead != -1L) - { - if (bufOut.getLength() > 0) - { - bufIn.reset(bufOut.getData(), bufOut.getLength()); - /* read the key even though we do not use it */ - bufIn.readUTF(); - bufIn.readInt(); - } - } - return bufIn; - } - finally - { - if (dataReader != null) - { - dataReader.close(); - } - } - } - - /** - * obtain a BlockReader for the getColumnSlice call. - */ - public ColumnGroupReader getColumnGroupReader(String key, String cfName, byte[] startColumn, boolean isAscending) throws IOException - { - IFileReader dataReader = SequenceFile.reader(dataFile); - - try - { - /* Morph key into actual key based on the partition type. */ - String decoratedKey = partitioner.decorateKey(key); - long position = getPosition(decoratedKey, partitioner); - AbstractType comparator = DatabaseDescriptor.getComparator(getTableName(), cfName); - return new ColumnGroupReader(dataFile, decoratedKey, cfName, comparator, startColumn, isAscending, position); - } - finally - { - dataReader.close(); - } - } - - public void delete() throws IOException - { - FileUtils.deleteWithConfirm(new File(dataFile)); - FileUtils.deleteWithConfirm(new File(indexFilename(dataFile))); - FileUtils.deleteWithConfirm(new File(filterFilename(dataFile))); - openedFiles.remove(dataFile); - } - - /** obviously only for testing */ - public void forceBloomFilterFailures() - { - bf = BloomFilter.alwaysMatchingBloomFilter(); - } - - static void reopenUnsafe() throws IOException // testing only - { - Collection<SSTableReader> sstables = new ArrayList<SSTableReader>(openedFiles.values()); - openedFiles.clear(); - for (SSTableReader sstable : sstables) - { - SSTableReader.open(sstable.dataFile, sstable.partitioner, 0.01); - } - } - - IPartitioner getPartitioner() - { - return partitioner; - } - - public FileStruct getFileStruct() throws IOException - { - return new FileStruct(this); - } - - public String getTableName() - { - return parseTableName(dataFile); - } - - public static void deleteAll() throws IOException - { - for (SSTableReader sstable : openedFiles.values()) - { - sstable.delete(); - } - } -} - -class FileSSTableMap -{ - private final Map<String, SSTableReader> map = new NonBlockingHashMap<String, SSTableReader>(); - - public SSTableReader get(String filename) - { - try - { - return map.get(new File(filename).getCanonicalPath()); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - public SSTableReader put(String filename, SSTableReader value) - { - try - { - return map.put(new File(filename).getCanonicalPath(), value); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - public Collection<SSTableReader> values() - { - return map.values(); - } - - public void clear() - { - map.clear(); - } - - public void remove(String filename) throws IOException - { - map.remove(new File(filename).getCanonicalPath()); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io; + +import java.io.*; +import java.util.*; + +import org.apache.log4j.Logger; + +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.SequenceFile.ColumnGroupReader; +import org.apache.cassandra.utils.BloomFilter; +import org.apache.cassandra.utils.FileUtils; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.cliffc.high_scale_lib.NonBlockingHashMap; +import com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap; + +public class SSTableReader extends SSTable +{ + private static Logger logger = Logger.getLogger(SSTableReader.class); + + private static FileSSTableMap openedFiles = new FileSSTableMap(); + + public static int indexInterval() + { + return INDEX_INTERVAL; + } + + // todo can we refactor to take list of sstables? + public static int getApproximateKeyCount(List<String> dataFiles) + { + int count = 0; + + for (String dataFileName : dataFiles) + { + SSTableReader sstable = openedFiles.get(dataFileName); + assert sstable != null; + int indexKeyCount = sstable.getIndexPositions().size(); + count = count + (indexKeyCount + 1) * INDEX_INTERVAL; + if (logger.isDebugEnabled()) + logger.debug("index size for bloom filter calc for file : " + dataFileName + " : " + count); + } + + return count; + } + + /** + * Get all indexed keys in the SSTable. + */ + public static List<String> getIndexedKeys() + { + List<String> indexedKeys = new ArrayList<String>(); + + for (SSTableReader sstable : openedFiles.values()) + { + for (KeyPosition kp : sstable.getIndexPositions()) + { + indexedKeys.add(kp.key); + } + } + Collections.sort(indexedKeys); + + return indexedKeys; + } + + public static synchronized SSTableReader open(String dataFileName) throws IOException + { + return open(dataFileName, StorageService.getPartitioner(), DatabaseDescriptor.getKeysCachedFraction(parseTableName(dataFileName))); + } + + public static synchronized SSTableReader open(String dataFileName, IPartitioner partitioner, double cacheFraction) throws IOException + { + SSTableReader sstable = openedFiles.get(dataFileName); + if (sstable == null) + { + assert partitioner != null; + sstable = new SSTableReader(dataFileName, partitioner); + + long start = System.currentTimeMillis(); + sstable.loadIndexFile(); + sstable.loadBloomFilter(); + if (cacheFraction > 0) + { + sstable.keyCache = createKeyCache((int)((sstable.getIndexPositions().size() + 1) * INDEX_INTERVAL * cacheFraction)); + } + if (logger.isDebugEnabled()) + logger.debug("INDEX LOAD TIME for " + dataFileName + ": " + (System.currentTimeMillis() - start) + " ms."); + + openedFiles.put(dataFileName, sstable); + } + return sstable; + } + + public static synchronized SSTableReader get(String dataFileName) throws IOException + { + SSTableReader sstable = openedFiles.get(dataFileName); + assert sstable != null; + return sstable; + } + + public static ConcurrentLinkedHashMap<String, Long> createKeyCache(int size) + { + return ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.SECOND_CHANCE, size); + } + + + private ConcurrentLinkedHashMap<String, Long> keyCache; + + SSTableReader(String filename, IPartitioner partitioner, List<KeyPosition> indexPositions, BloomFilter bloomFilter, ConcurrentLinkedHashMap<String, Long> keyCache) + { + super(filename, partitioner); + this.indexPositions = indexPositions; + this.bf = bloomFilter; + this.keyCache = keyCache; + synchronized (SSTableReader.this) + { + openedFiles.put(filename, this); + } + } + + private SSTableReader(String filename, IPartitioner partitioner) + { + super(filename, partitioner); + } + + public List<KeyPosition> getIndexPositions() + { + return indexPositions; + } + + private void loadBloomFilter() throws IOException + { + DataInputStream stream = new DataInputStream(new FileInputStream(filterFilename())); + bf = BloomFilter.serializer().deserialize(stream); + } + + private void loadIndexFile() throws IOException + { + BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r"); + indexPositions = new ArrayList<KeyPosition>(); + + int i = 0; + long indexSize = input.length(); + while (true) + { + long indexPosition = input.getFilePointer(); + if (indexPosition == indexSize) + { + break; + } + String decoratedKey = input.readUTF(); + input.readLong(); + if (i++ % INDEX_INTERVAL == 0) + { + indexPositions.add(new KeyPosition(decoratedKey, indexPosition)); + } + } + } + + /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */ + private long getIndexScanPosition(String decoratedKey, IPartitioner partitioner) + { + assert indexPositions != null && indexPositions.size() > 0; + int index = Collections.binarySearch(indexPositions, new KeyPosition(decoratedKey, -1)); + if (index < 0) + { + // binary search gives us the first index _greater_ than the key searched for, + // i.e., its insertion position + int greaterThan = (index + 1) * -1; + if (greaterThan == 0) + return -1; + return indexPositions.get(greaterThan - 1).position; + } + else + { + return indexPositions.get(index).position; + } + } + + /** + * returns the position in the data file to find the given key, or -1 if the key is not present + */ + public long getPosition(String decoratedKey, IPartitioner partitioner) throws IOException + { + if (!bf.isPresent(decoratedKey)) + return -1; + if (keyCache != null) + { + Long cachedPosition = keyCache.get(decoratedKey); + if (cachedPosition != null) + { + return cachedPosition; + } + } + long start = getIndexScanPosition(decoratedKey, partitioner); + if (start < 0) + { + return -1; + } + + // TODO mmap the index file? + BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile), "r"); + input.seek(start); + int i = 0; + try + { + do + { + String indexDecoratedKey; + try + { + indexDecoratedKey = input.readUTF(); + } + catch (EOFException e) + { + return -1; + } + long position = input.readLong(); + int v = partitioner.getDecoratedKeyComparator().compare(indexDecoratedKey, decoratedKey); + if (v == 0) + { + if (keyCache != null) + keyCache.put(decoratedKey, position); + return position; + } + if (v > 0) + return -1; + } while (++i < INDEX_INTERVAL); + } + finally + { + input.close(); + } + return -1; + } + + /** like getPosition, but if key is not found will return the location of the first key _greater_ than the desired one, or -1 if no such key exists. */ + public long getNearestPosition(String decoratedKey) throws IOException + { + long start = getIndexScanPosition(decoratedKey, partitioner); + if (start < 0) + { + return 0; + } + BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(dataFile), "r"); + input.seek(start); + try + { + while (true) + { + String indexDecoratedKey; + try + { + indexDecoratedKey = input.readUTF(); + } + catch (EOFException e) + { + return -1; + } + long position = input.readLong(); + int v = partitioner.getDecoratedKeyComparator().compare(indexDecoratedKey, decoratedKey); + if (v >= 0) + return position; + } + } + finally + { + input.close(); + } + } + + public DataInputBuffer next(final String clientKey, String cfName, SortedSet<byte[]> columnNames) throws IOException + { + IFileReader dataReader = null; + try + { + dataReader = SequenceFile.reader(dataFile); + String decoratedKey = partitioner.decorateKey(clientKey); + long position = getPosition(decoratedKey, partitioner); + + DataOutputBuffer bufOut = new DataOutputBuffer(); + DataInputBuffer bufIn = new DataInputBuffer(); + long bytesRead = dataReader.next(decoratedKey, bufOut, cfName, columnNames, position); + if (bytesRead != -1L) + { + if (bufOut.getLength() > 0) + { + bufIn.reset(bufOut.getData(), bufOut.getLength()); + /* read the key even though we do not use it */ + bufIn.readUTF(); + bufIn.readInt(); + } + } + return bufIn; + } + finally + { + if (dataReader != null) + { + dataReader.close(); + } + } + } + + /** + * obtain a BlockReader for the getColumnSlice call. + */ + public ColumnGroupReader getColumnGroupReader(String key, String cfName, byte[] startColumn, boolean isAscending) throws IOException + { + IFileReader dataReader = SequenceFile.reader(dataFile); + + try + { + /* Morph key into actual key based on the partition type. */ + String decoratedKey = partitioner.decorateKey(key); + long position = getPosition(decoratedKey, partitioner); + AbstractType comparator = DatabaseDescriptor.getComparator(getTableName(), cfName); + return new ColumnGroupReader(dataFile, decoratedKey, cfName, comparator, startColumn, isAscending, position); + } + finally + { + dataReader.close(); + } + } + + public void delete() throws IOException + { + FileUtils.deleteWithConfirm(new File(dataFile)); + FileUtils.deleteWithConfirm(new File(indexFilename(dataFile))); + FileUtils.deleteWithConfirm(new File(filterFilename(dataFile))); + openedFiles.remove(dataFile); + } + + /** obviously only for testing */ + public void forceBloomFilterFailures() + { + bf = BloomFilter.alwaysMatchingBloomFilter(); + } + + static void reopenUnsafe() throws IOException // testing only + { + Collection<SSTableReader> sstables = new ArrayList<SSTableReader>(openedFiles.values()); + openedFiles.clear(); + for (SSTableReader sstable : sstables) + { + SSTableReader.open(sstable.dataFile, sstable.partitioner, 0.01); + } + } + + IPartitioner getPartitioner() + { + return partitioner; + } + + public FileStruct getFileStruct() throws IOException + { + return new FileStruct(this); + } + + public String getTableName() + { + return parseTableName(dataFile); + } + + public static void deleteAll() throws IOException + { + for (SSTableReader sstable : openedFiles.values()) + { + sstable.delete(); + } + } +} + +class FileSSTableMap +{ + private final Map<String, SSTableReader> map = new NonBlockingHashMap<String, SSTableReader>(); + + public SSTableReader get(String filename) + { + try + { + return map.get(new File(filename).getCanonicalPath()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + public SSTableReader put(String filename, SSTableReader value) + { + try + { + return map.put(new File(filename).getCanonicalPath(), value); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + public Collection<SSTableReader> values() + { + return map.values(); + } + + public void clear() + { + map.clear(); + } + + public void remove(String filename) throws IOException + { + map.remove(new File(filename).getCanonicalPath()); + } +}
