Author: jbellis
Date: Sat Mar 20 01:14:52 2010
New Revision: 925514
URL: http://svn.apache.org/viewvc?rev=925514&view=rev
Log: (empty)
Added:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/contrib/ (props changed)
cassandra/branches/cassandra-0.6/contrib/client_only/ (props changed)
cassandra/branches/cassandra-0.6/contrib/word_count/ (props changed)
cassandra/branches/cassandra-0.6/interface/ (props changed)
cassandra/branches/cassandra-0.6/src/ (props changed)
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTable.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/FileDataInput.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
cassandra/branches/cassandra-0.6/test/ (props changed)
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Sat Mar 20 01:14:52 2010
@@ -3,6 +3,7 @@
* Bootstrapping can skip ranges under the right conditions (CASSANDRA-902)
* fix merging row versions in range_slice for CL > ONE (CASSANDRA-884)
* default write ConsistencyLeven chaned from ZERO to ONE
+ * fix for index entries spanning mmap buffer boundaries (CASSANDRA-857)
* use lexical comparison if time part of TimeUUIDs are the same
(CASSANDRA-907)
* bound read, mutation, and response stages to fix possible OOM
Propchange: cassandra/branches/cassandra-0.6/contrib/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sat Mar 20 01:14:52 2010
@@ -0,0 +1 @@
+*.iml
Propchange: cassandra/branches/cassandra-0.6/contrib/client_only/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sat Mar 20 01:14:52 2010
@@ -0,0 +1 @@
+*.iml
Propchange: cassandra/branches/cassandra-0.6/contrib/word_count/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sat Mar 20 01:14:52 2010
@@ -0,0 +1 @@
+*.iml
Propchange: cassandra/branches/cassandra-0.6/interface/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Sat Mar 20 01:14:52 2010
@@ -1 +1,2 @@
avro
+*.iml
Propchange: cassandra/branches/cassandra-0.6/src/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Sat Mar 20 01:14:52 2010
@@ -1,2 +1,2 @@
gen-java
-
+*.iml
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Sat Mar 20 01:14:52 2010
@@ -49,10 +49,7 @@ import org.apache.cassandra.dht.Abstract
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.io.SSTableScanner;
-import org.apache.cassandra.io.SSTableTracker;
+import org.apache.cassandra.io.*;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.SliceRange;
@@ -1128,10 +1125,10 @@ public class ColumnFamilyStore implement
return Iterables.concat(stores);
}
- public Iterable<SSTable.KeyPosition> allIndexPositions()
+ public Iterable<IndexSummary.KeyPosition> allIndexPositions()
{
Collection<SSTableReader> sstables = getSSTables();
- Iterable<SSTable.KeyPosition>[] positions = new
Iterable[sstables.size()];
+ Iterable<IndexSummary.KeyPosition>[] positions = new
Iterable[sstables.size()];
int i = 0;
for (SSTableReader sstable: sstables)
{
Added:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java?rev=925514&view=auto
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java
(added)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java
Sat Mar 20 01:14:52 2010
@@ -0,0 +1,101 @@
+package org.apache.cassandra.io;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.DecoratedKey;
+
+public class IndexSummary
+{
+ /** Every 128th index entry is loaded into memory so we know where to
start looking for the actual key w/o seeking */
+ public static final int INDEX_INTERVAL = 128;/* Required extension for
temporary files created during compactions. */
+
+ private ArrayList<KeyPosition> indexPositions;
+ private Map<KeyPosition, SSTable.PositionSize> spannedIndexDataPositions;
+ private Map<Long, KeyPosition> spannedIndexPositions;
+ int keysWritten = 0;
+
+ public void maybeAddEntry(DecoratedKey decoratedKey, long dataPosition,
long dataSize, long indexPosition, long nextIndexPosition)
+ {
+ boolean spannedIndexEntry = SSTableReader.bufferIndex(indexPosition)
!= SSTableReader.bufferIndex(nextIndexPosition);
+ if (keysWritten++ % INDEX_INTERVAL == 0 || spannedIndexEntry)
+ {
+ if (indexPositions == null)
+ {
+ indexPositions = new ArrayList<KeyPosition>();
+ }
+ KeyPosition info = new KeyPosition(decoratedKey, indexPosition);
+ indexPositions.add(info);
+
+ if (spannedIndexEntry)
+ {
+ if (spannedIndexDataPositions == null)
+ {
+ spannedIndexDataPositions = new HashMap<KeyPosition,
SSTable.PositionSize>();
+ spannedIndexPositions = new HashMap<Long, KeyPosition>();
+ }
+ spannedIndexDataPositions.put(info, new
SSTable.PositionSize(dataPosition, dataSize));
+ spannedIndexPositions.put(info.indexPosition, info);
+ }
+ }
+ }
+
+ public List<KeyPosition> getIndexPositions()
+ {
+ return indexPositions;
+ }
+
+ public void complete()
+ {
+ indexPositions.trimToSize();
+ }
+
+ public SSTable.PositionSize getSpannedPosition(KeyPosition sampledPosition)
+ {
+ if (spannedIndexDataPositions == null)
+ return null;
+ return spannedIndexDataPositions.get(sampledPosition);
+ }
+
+ public SSTable.PositionSize getSpannedPosition(long nextIndexPosition)
+ {
+ if (spannedIndexDataPositions == null)
+ return null;
+
+ KeyPosition info = spannedIndexPositions.get(nextIndexPosition);
+ if (info == null)
+ return null;
+
+ return spannedIndexDataPositions.get(info);
+ }
+
+ /**
+ * This is a simple container for the index Key and its corresponding
position
+ * in the index file. Binary search is performed on a list of these objects
+ * to find where to start looking for the index entry containing the data
position
+ * (which will be turned into a PositionSize object)
+ */
+ public static class KeyPosition implements Comparable<KeyPosition>
+ {
+ public final DecoratedKey key;
+ public final long indexPosition;
+
+ public KeyPosition(DecoratedKey key, long indexPosition)
+ {
+ this.key = key;
+ this.indexPosition = indexPosition;
+ }
+
+ public int compareTo(KeyPosition kp)
+ {
+ return key.compareTo(kp.key);
+ }
+
+ public String toString()
+ {
+ return key + ":" + indexPosition;
+ }
+ }
+}
\ No newline at end of file
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTable.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTable.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTable.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTable.java
Sat Mar 20 01:14:52 2010
@@ -25,7 +25,6 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Arrays;
-import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.commons.lang.StringUtils;
@@ -56,12 +55,9 @@ public abstract class SSTable
protected String path;
protected IPartitioner partitioner;
protected BloomFilter bf;
- protected List<KeyPosition> indexPositions;
- protected Map<KeyPosition, PositionSize> spannedIndexDataPositions; // map
of index position, to data position, for index entries spanning mmap segments
protected String columnFamilyName;
+ protected IndexSummary indexSummary;
- /* Every 128th index entry is loaded into memory so we know where to start
looking for the actual key w/o seeking */
- public static final int INDEX_INTERVAL = 128;/* Required extension for
temporary files created during compactions. */
public static final String TEMPFILE_MARKER = "tmp";
public SSTable(String filename, IPartitioner partitioner)
@@ -173,33 +169,6 @@ public abstract class SSTable
return sum;
}
- /**
- * This is a simple container for the index Key and its corresponding
position
- * in the data file. Binary search is performed on a list of these objects
- * to lookup keys within the SSTable data file.
- */
- public class KeyPosition implements Comparable<KeyPosition>
- {
- public final DecoratedKey key;
- public final long position;
-
- public KeyPosition(DecoratedKey key, long position)
- {
- this.key = key;
- this.position = position;
- }
-
- public int compareTo(KeyPosition kp)
- {
- return key.compareTo(kp.key);
- }
-
- public String toString()
- {
- return key + ":" + position;
- }
- }
-
public long bytesOnDisk()
{
long bytes = 0;
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
Sat Mar 20 01:14:52 2010
@@ -89,7 +89,7 @@ public class SSTableReader extends SSTab
public static int indexInterval()
{
- return INDEX_INTERVAL;
+ return IndexSummary.INDEX_INTERVAL;
}
public static long getApproximateKeyCount(Iterable<SSTableReader> sstables)
@@ -99,7 +99,7 @@ public class SSTableReader extends SSTab
for (SSTableReader sstable : sstables)
{
int indexKeyCount = sstable.getIndexPositions().size();
- count = count + (indexKeyCount + 1) * INDEX_INTERVAL;
+ count = count + (indexKeyCount + 1) * IndexSummary.INDEX_INTERVAL;
if (logger.isDebugEnabled())
logger.debug("index size for bloom filter calc for file : " +
sstable.getFilename() + " : " + count);
}
@@ -136,10 +136,7 @@ public class SSTableReader extends SSTab
private InstrumentedCache<Pair<String, DecoratedKey>, PositionSize>
keyCache;
- SSTableReader(String filename,
- IPartitioner partitioner,
- List<KeyPosition> indexPositions, Map<KeyPosition,
PositionSize> spannedIndexDataPositions,
- BloomFilter bloomFilter)
+ SSTableReader(String filename, IPartitioner partitioner, IndexSummary
indexSummary, BloomFilter bloomFilter)
throws IOException
{
super(filename, partitioner);
@@ -179,8 +176,7 @@ public class SSTableReader extends SSTab
buffers = null;
}
- this.indexPositions = indexPositions;
- this.spannedIndexDataPositions = spannedIndexDataPositions;
+ this.indexSummary = indexSummary;
this.bf = bloomFilter;
}
@@ -217,17 +213,17 @@ public class SSTableReader extends SSTab
private SSTableReader(String filename, IPartitioner partitioner) throws
IOException
{
- this(filename, partitioner, null, null, null);
+ this(filename, partitioner, null, null);
}
- public List<KeyPosition> getIndexPositions()
+ public List<IndexSummary.KeyPosition> getIndexPositions()
{
- return indexPositions;
+ return indexSummary.getIndexPositions();
}
public long estimatedKeys()
{
- return indexPositions.size() * INDEX_INTERVAL;
+ return indexSummary.getIndexPositions().size() *
IndexSummary.INDEX_INTERVAL;
}
void loadBloomFilter() throws IOException
@@ -245,14 +241,13 @@ public class SSTableReader extends SSTab
void loadIndexFile() throws IOException
{
- indexPositions = new ArrayList<KeyPosition>();
// we read the positions in a BRAF so we don't have to worry about an
entry spanning a mmap boundary.
// any entries that do, we force into the in-memory sample so key
lookup can always bsearch within
// a single mmapped segment.
+ indexSummary = new IndexSummary();
BufferedRandomAccessFile input = new
BufferedRandomAccessFile(indexFilename(), "r");
try
{
- int i = 0;
long indexSize = input.length();
while (true)
{
@@ -264,27 +259,21 @@ public class SSTableReader extends SSTab
DecoratedKey decoratedKey =
partitioner.convertFromDiskFormat(input.readUTF());
long dataPosition = input.readLong();
long nextIndexPosition = input.getFilePointer();
- boolean spannedEntry = bufferIndex(indexPosition) !=
bufferIndex(nextIndexPosition);
- if (i++ % INDEX_INTERVAL == 0 || spannedEntry)
+ // read the next index entry to see how big the row is
+ long nextDataPosition;
+ if (input.isEOF())
+ {
+ nextDataPosition = length();
+ }
+ else
{
- KeyPosition info;
- info = new KeyPosition(decoratedKey, indexPosition);
- indexPositions.add(info);
-
- if (spannedEntry)
- {
- if (spannedIndexDataPositions == null)
- {
- spannedIndexDataPositions = new
HashMap<KeyPosition, PositionSize>();
- }
- // read the next index entry to see how big the row is
corresponding to the current, mmap-segment-spanning one
- input.readUTF();
- long nextDataPosition = input.readLong();
- input.seek(nextIndexPosition);
- spannedIndexDataPositions.put(info, new
PositionSize(dataPosition, nextDataPosition - dataPosition));
- }
+ input.readUTF();
+ nextDataPosition = input.readLong();
+ input.seek(nextIndexPosition);
}
+ indexSummary.maybeAddEntry(decoratedKey, dataPosition,
nextDataPosition - dataPosition, indexPosition, nextIndexPosition);
}
+ indexSummary.complete();
}
finally
{
@@ -293,10 +282,10 @@ public class SSTableReader extends SSTab
}
/** get the position in the index file to start scanning to find the given
key (at most indexInterval keys away) */
- private KeyPosition getIndexScanPosition(DecoratedKey decoratedKey)
+ private IndexSummary.KeyPosition getIndexScanPosition(DecoratedKey
decoratedKey)
{
- assert indexPositions != null && indexPositions.size() > 0;
- int index = Collections.binarySearch(indexPositions, new
KeyPosition(decoratedKey, -1));
+ assert indexSummary.getIndexPositions() != null &&
indexSummary.getIndexPositions().size() > 0;
+ int index = Collections.binarySearch(indexSummary.getIndexPositions(),
new IndexSummary.KeyPosition(decoratedKey, -1));
if (index < 0)
{
// binary search gives us the first index _greater_ than the key
searched for,
@@ -304,11 +293,11 @@ public class SSTableReader extends SSTab
int greaterThan = (index + 1) * -1;
if (greaterThan == 0)
return null;
- return indexPositions.get(greaterThan - 1);
+ return indexSummary.getIndexPositions().get(greaterThan - 1);
}
else
{
- return indexPositions.get(index);
+ return indexSummary.getIndexPositions().get(index);
}
}
@@ -333,23 +322,19 @@ public class SSTableReader extends SSTab
}
// next, see if the sampled index says it's impossible for the key to
be present
- KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
+ IndexSummary.KeyPosition sampledPosition =
getIndexScanPosition(decoratedKey);
if (sampledPosition == null)
- {
return null;
- }
// handle exact sampled index hit
- if (spannedIndexDataPositions != null)
- {
- PositionSize info = spannedIndexDataPositions.get(sampledPosition);
- if (info != null)
- return info;
- }
+ PositionSize info = indexSummary.getSpannedPosition(sampledPosition);
+ if (info != null)
+ return info;
- // scan the on-disk index, starting at the nearest sampled position
- long p = sampledPosition.position;
+ // get either a buffered or a mmap'd input for the on-disk index
+ long p = sampledPosition.indexPosition;
FileDataInput input;
+ int bufferIndex = bufferIndex(p);
if (indexBuffers == null)
{
input = new BufferedRandomAccessFile(indexFilename(), "r");
@@ -357,45 +342,39 @@ public class SSTableReader extends SSTab
}
else
{
- input = new MappedFileDataInput(indexBuffers[bufferIndex(p)],
indexFilename(), (int)(p % BUFFER_SIZE));
+ input = new MappedFileDataInput(indexBuffers[bufferIndex],
indexFilename(), BUFFER_SIZE * bufferIndex, (int)(p % BUFFER_SIZE));
}
+
+ // scan the on-disk index, starting at the nearest sampled position
try
{
int i = 0;
do
{
- DecoratedKey indexDecoratedKey;
- try
+ // if using mmapped i/o, skip to the next mmap buffer if
necessary
+ if (input.isEOF() ||
indexSummary.getSpannedPosition(input.getAbsolutePosition()) != null)
{
- indexDecoratedKey =
partitioner.convertFromDiskFormat(input.readUTF());
- }
- catch (EOFException e)
- {
- return null;
+ if (indexBuffers == null || ++bufferIndex ==
indexBuffers.length)
+ break;
+ input = new MappedFileDataInput(indexBuffers[bufferIndex],
indexFilename(), BUFFER_SIZE * bufferIndex, 0);
+ continue;
}
- long position = input.readLong();
+
+ // read key & data position from index entry
+ DecoratedKey indexDecoratedKey =
partitioner.convertFromDiskFormat(input.readUTF());
+ long dataPosition = input.readLong();
+
int v = indexDecoratedKey.compareTo(decoratedKey);
if (v == 0)
{
- PositionSize info;
- if (!input.isEOF())
- {
- int utflen = input.readUnsignedShort();
- if (utflen != input.skipBytes(utflen))
- throw new EOFException();
- info = new PositionSize(position, input.readLong() -
position);
- }
- else
- {
- info = new PositionSize(position, length() - position);
- }
+ info = getDataPositionSize(input, dataPosition);
if (keyCache != null && keyCache.getCapacity() > 0)
keyCache.put(unifiedKey, info);
return info;
}
if (v > 0)
return null;
- } while (++i < INDEX_INTERVAL);
+ } while (++i < IndexSummary.INDEX_INTERVAL);
}
finally
{
@@ -404,10 +383,30 @@ public class SSTableReader extends SSTab
return null;
}
+ private PositionSize getDataPositionSize(FileDataInput input, long
dataPosition) throws IOException
+ {
+ // if we've reached the end of the index, then the row size is "the
rest of the data file"
+ if (input.isEOF())
+ return new PositionSize(dataPosition, length() - dataPosition);
+
+ // otherwise, row size is the start of the next row (in next index
entry), minus the start of this one.
+ long nextIndexPosition = input.getAbsolutePosition();
+ // if next index entry would span mmap boundary, get the next row
position from the summary instead
+ PositionSize nextPositionSize =
indexSummary.getSpannedPosition(nextIndexPosition);
+ if (nextPositionSize != null)
+ return new PositionSize(dataPosition, nextPositionSize.position -
dataPosition);
+
+ // read next entry directly
+ int utflen = input.readUnsignedShort();
+ if (utflen != input.skipBytes(utflen))
+ throw new EOFException();
+ return new PositionSize(dataPosition, input.readLong() - dataPosition);
+ }
+
/** like getPosition, but if key is not found will return the location of
the first key _greater_ than the desired one, or -1 if no such key exists. */
public long getNearestPosition(DecoratedKey decoratedKey) throws
IOException
{
- KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
+ IndexSummary.KeyPosition sampledPosition =
getIndexScanPosition(decoratedKey);
if (sampledPosition == null)
{
return 0;
@@ -415,7 +414,7 @@ public class SSTableReader extends SSTab
// can't use a MappedFileDataInput here, since we might cross a
segment boundary while scanning
BufferedRandomAccessFile input = new
BufferedRandomAccessFile(indexFilename(path), "r");
- input.seek(sampledPosition.position);
+ input.seek(sampledPosition.indexPosition);
try
{
while (true)
@@ -490,7 +489,7 @@ public class SSTableReader extends SSTab
file.seek(info.position);
return file;
}
- return new MappedFileDataInput(buffers[bufferIndex(info.position)],
path, (int) (info.position % BUFFER_SIZE));
+ return new MappedFileDataInput(buffers[bufferIndex(info.position)],
path, BUFFER_SIZE * (info.position / BUFFER_SIZE), (int) (info.position %
BUFFER_SIZE));
}
static int bufferIndex(long position)
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
Sat Mar 20 01:14:52 2010
@@ -35,7 +35,6 @@ import org.apache.cassandra.db.Decorated
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.FBUtilities;
@@ -52,6 +51,7 @@ public class SSTableWriter extends SSTab
public SSTableWriter(String filename, long keyCount, IPartitioner
partitioner) throws IOException
{
super(filename, partitioner);
+ indexSummary = new IndexSummary();
dataFile = new BufferedRandomAccessFile(path, "rw",
(int)(DatabaseDescriptor.getFlushDataBufferSizeInMB() * 1024 * 1024));
indexFile = new BufferedRandomAccessFile(indexFilename(), "rw",
(int)(DatabaseDescriptor.getFlushIndexBufferSizeInMB() * 1024 * 1024));
bf = BloomFilter.getFilter(keyCount, 15);
@@ -86,25 +86,7 @@ public class SSTableWriter extends SSTab
if (logger.isTraceEnabled())
logger.trace("wrote index of " + decoratedKey + " at " +
indexPosition);
- boolean spannedIndexEntry = SSTableReader.bufferIndex(indexPosition)
!= SSTableReader.bufferIndex(indexFile.getFilePointer());
- if (keysWritten++ % INDEX_INTERVAL == 0 || spannedIndexEntry)
- {
- if (indexPositions == null)
- {
- indexPositions = new ArrayList<KeyPosition>();
- }
- KeyPosition info = new KeyPosition(decoratedKey, indexPosition);
- indexPositions.add(info);
-
- if (spannedIndexEntry)
- {
- if (spannedIndexDataPositions == null)
- {
- spannedIndexDataPositions = new HashMap<KeyPosition,
PositionSize>();
- }
- spannedIndexDataPositions.put(info, new
PositionSize(dataPosition, dataSize));
- }
- }
+ indexSummary.maybeAddEntry(decoratedKey, dataPosition, dataSize,
indexPosition, indexFile.getFilePointer());
}
// TODO make this take a DataOutputStream and wrap the byte[] version to
combine them
@@ -153,7 +135,8 @@ public class SSTableWriter extends SSTab
rename(filterFilename());
path = rename(path); // important to do this last since index & filter
file names are derived from it
- return new SSTableReader(path, partitioner, indexPositions,
spannedIndexDataPositions, bf);
+ indexSummary.complete();
+ return new SSTableReader(path, partitioner, indexSummary, bf);
}
static String rename(String tmpFilename)
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
Sat Mar 20 01:14:52 2010
@@ -252,7 +252,12 @@ public class BufferedRandomAccessFile ex
}
this.curr_ = pos;
}
-
+
+ public long getAbsolutePosition()
+ {
+ return getFilePointer();
+ }
+
public long getFilePointer()
{
return this.curr_;
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/FileDataInput.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/FileDataInput.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/FileDataInput.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/FileDataInput.java
Sat Mar 20 01:14:52 2010
@@ -36,4 +36,6 @@ public interface FileDataInput extends D
public void reset() throws IOException;
public int bytesPastMark();
+
+ long getAbsolutePosition();
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
Sat Mar 20 01:14:52 2010
@@ -30,21 +30,28 @@ public class MappedFileDataInput extends
private final String filename;
private int position;
private int markedPosition;
+ private final long absoluteStartPosition;
- public MappedFileDataInput(MappedByteBuffer buffer, String filename)
+ public MappedFileDataInput(MappedByteBuffer buffer, String filename, long
absoluteStartPosition)
{
- this(buffer, filename, 0);
+ this(buffer, filename, absoluteStartPosition, 0);
}
- public MappedFileDataInput(MappedByteBuffer buffer, String filename, int
position)
+ public MappedFileDataInput(MappedByteBuffer buffer, String filename, long
absoluteStartPosition, int position)
{
assert buffer != null;
+ this.absoluteStartPosition = absoluteStartPosition;
this.buffer = buffer;
this.filename = filename;
this.position = position;
}
- // don't make this public, this is only for seeking WITHIN the current
mapped segment
+ public long getAbsolutePosition()
+ {
+ return absoluteStartPosition + position;
+ }
+
+// don't make this public, this is only for seeking WITHIN the current mapped
segment
private void seekInternal(int pos) throws IOException
{
position = pos;
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
Sat Mar 20 01:14:52 2010
@@ -33,7 +33,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.CompactionIterator.CompactedRow;
import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.IndexSummary;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.streaming.StreamOut;
import org.apache.cassandra.net.IVerbHandler;
@@ -44,7 +44,6 @@ import org.apache.cassandra.utils.*;
import org.apache.log4j.Logger;
import com.google.common.collect.Collections2;
-import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
/**
@@ -367,7 +366,7 @@ public class AntiEntropyService
}
if (cfs != null) // TODO test w/ valid CF definitions, this if{}
shouldn't be necessary
{
- for (SSTable.KeyPosition info: cfs.allIndexPositions())
+ for (IndexSummary.KeyPosition info: cfs.allIndexPositions())
keys.add(info.key);
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
Sat Mar 20 01:14:52 2010
@@ -27,7 +27,6 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.net.InetAddress;
@@ -41,7 +40,7 @@ import org.apache.cassandra.db.commitlog
import org.apache.cassandra.dht.*;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.DeletionService;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.IndexSummary;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.locator.*;
import org.apache.cassandra.net.*;
@@ -1233,7 +1232,7 @@ public class StorageService implements I
List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
- for (SSTable.KeyPosition info: cfs.allIndexPositions())
+ for (IndexSummary.KeyPosition info: cfs.allIndexPositions())
{
if (range.contains(info.key.token))
keys.add(info.key);
@@ -1262,7 +1261,7 @@ public class StorageService implements I
List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
- for (SSTable.KeyPosition info: cfs.allIndexPositions())
+ for (IndexSummary.KeyPosition info: cfs.allIndexPositions())
{
if (range.contains(info.key.token))
keys.add(info.key);
Propchange: cassandra/branches/cassandra-0.6/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sat Mar 20 01:14:52 2010
@@ -0,0 +1 @@
+*.iml