Author: jbellis
Date: Tue Apr 13 02:25:56 2010
New Revision: 933465
URL: http://svn.apache.org/viewvc?rev=933465&view=rev
Log:
fix index scans that cross the 2GB mmap boundaries for both mmap and standard
i/o modes.
patch by jbellis; tested by Todd Burruss and reviewed by gdusbabek for
CASSANDRA-866
Added:
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableReaderTest.java
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftValidation.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=933465&r1=933464&r2=933465&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Tue Apr 13 02:25:56 2010
@@ -16,6 +16,8 @@
(CASSANDRA-969)
* Retrieve the correct number of undeleted columns, if any, from
a supercolumn in a row that had been deleted previously (CASSANDRA-920)
+ * fix index scans that cross the 2GB mmap boundaries for both mmap
+ and standard i/o modes (CASSANDRA-866)
0.6.0-RC1
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java?rev=933465&r1=933464&r2=933465&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java
Tue Apr 13 02:25:56 2010
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
public class IndexSummary
@@ -36,11 +37,13 @@ public class IndexSummary
private ArrayList<KeyPosition> indexPositions;
private Map<KeyPosition, SSTable.PositionSize> spannedIndexDataPositions;
private Map<Long, KeyPosition> spannedIndexPositions;
- int keysWritten = 0;
+ private int keysWritten = 0;
+ private long lastIndexPosition;
public void maybeAddEntry(DecoratedKey decoratedKey, long dataPosition,
long dataSize, long indexPosition, long nextIndexPosition)
{
- boolean spannedIndexEntry = SSTableReader.bufferIndex(indexPosition)
!= SSTableReader.bufferIndex(nextIndexPosition);
+ boolean spannedIndexEntry = DatabaseDescriptor.getIndexAccessMode() ==
DatabaseDescriptor.DiskAccessMode.mmap
+ &&
SSTableReader.bufferIndex(indexPosition) !=
SSTableReader.bufferIndex(nextIndexPosition);
if (keysWritten++ % INDEX_INTERVAL == 0 || spannedIndexEntry)
{
if (indexPositions == null)
@@ -61,6 +64,7 @@ public class IndexSummary
spannedIndexPositions.put(info.indexPosition, info);
}
}
+ lastIndexPosition = indexPosition;
}
public List<KeyPosition> getIndexPositions()
@@ -73,14 +77,19 @@ public class IndexSummary
indexPositions.trimToSize();
}
- public SSTable.PositionSize getSpannedPosition(KeyPosition sampledPosition)
+ public SSTable.PositionSize getSpannedDataPosition(KeyPosition
sampledPosition)
{
if (spannedIndexDataPositions == null)
return null;
return spannedIndexDataPositions.get(sampledPosition);
}
- public SSTable.PositionSize getSpannedPosition(long nextIndexPosition)
+ public KeyPosition getSpannedIndexPosition(long nextIndexPosition)
+ {
+ return spannedIndexPositions == null ? null :
spannedIndexPositions.get(nextIndexPosition);
+ }
+
+ public SSTable.PositionSize getSpannedDataPosition(long nextIndexPosition)
{
if (spannedIndexDataPositions == null)
return null;
@@ -92,6 +101,12 @@ public class IndexSummary
return spannedIndexDataPositions.get(info);
}
+ public long getLastIndexPosition()
+ {
+ return lastIndexPosition;
+ }
+
+
/**
* This is a simple container for the index Key and its corresponding
position
* in the index file. Binary search is performed on a list of these objects
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java?rev=933465&r1=933464&r2=933465&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
Tue Apr 13 02:25:56 2010
@@ -32,6 +32,7 @@ import org.apache.commons.lang.StringUti
import org.apache.cassandra.cache.InstrumentedCache;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -85,7 +86,8 @@ public class SSTableReader extends SSTab
};
new Thread(runnable, "SSTABLE-DELETER").start();
}};
- private static final long BUFFER_SIZE = Integer.MAX_VALUE;
+ // in a perfect world, BUFFER_SIZE would be final, but we need to test
with a smaller size to stay sane.
+ static long BUFFER_SIZE = Integer.MAX_VALUE;
public static int indexInterval()
{
@@ -326,15 +328,9 @@ public class SSTableReader extends SSTab
if (sampledPosition == null)
return null;
- // handle exact sampled index hit
- PositionSize info = indexSummary.getSpannedPosition(sampledPosition);
- if (info != null)
- return info;
-
// get either a buffered or a mmap'd input for the on-disk index
long p = sampledPosition.indexPosition;
FileDataInput input;
- int bufferIndex = bufferIndex(p);
if (indexBuffers == null)
{
input = new BufferedRandomAccessFile(indexFilename(), "r");
@@ -342,7 +338,7 @@ public class SSTableReader extends SSTab
}
else
{
- input = new MappedFileDataInput(indexBuffers[bufferIndex],
indexFilename(), BUFFER_SIZE * bufferIndex, (int)(p % BUFFER_SIZE));
+ input = indexInputAt(p);
}
// scan the on-disk index, starting at the nearest sampled position
@@ -351,12 +347,33 @@ public class SSTableReader extends SSTab
int i = 0;
do
{
+ // handle exact sampled index hit
+ IndexSummary.KeyPosition kp =
indexSummary.getSpannedIndexPosition(input.getAbsolutePosition());
+ if (kp != null && kp.key.equals(decoratedKey))
+ return indexSummary.getSpannedDataPosition(kp);
+
// if using mmapped i/o, skip to the next mmap buffer if
necessary
- if (input.isEOF() ||
indexSummary.getSpannedPosition(input.getAbsolutePosition()) != null)
+ if (input.isEOF() || kp != null)
{
- if (indexBuffers == null || ++bufferIndex ==
indexBuffers.length)
+ if (indexBuffers == null) // not mmap-ing, just one index
input
+ break;
+
+ FileDataInput oldInput = input;
+ if (kp == null)
+ {
+ input = indexInputAt(input.getAbsolutePosition());
+ }
+ else
+ {
+ long nextUnspannedPostion = input.getAbsolutePosition()
+ + 2 +
FBUtilities.encodedUTF8Length(StorageService.getPartitioner().convertToDiskFormat(kp.key))
+ + 8;
+ input = indexInputAt(nextUnspannedPostion);
+ }
+ oldInput.close();
+ if (input == null)
break;
- input = new MappedFileDataInput(indexBuffers[bufferIndex],
indexFilename(), BUFFER_SIZE * bufferIndex, 0);
+
continue;
}
@@ -367,7 +384,7 @@ public class SSTableReader extends SSTab
int v = indexDecoratedKey.compareTo(decoratedKey);
if (v == 0)
{
- info = getDataPositionSize(input, dataPosition);
+ PositionSize info = getDataPositionSize(input,
dataPosition);
if (keyCache != null && keyCache.getCapacity() > 0)
keyCache.put(unifiedKey, info);
return info;
@@ -378,11 +395,20 @@ public class SSTableReader extends SSTab
}
finally
{
- input.close();
+ if (input != null)
+ input.close();
}
return null;
}
+ private FileDataInput indexInputAt(long indexPosition)
+ {
+ if (indexPosition > indexSummary.getLastIndexPosition())
+ return null;
+ int bufferIndex = bufferIndex(indexPosition);
+ return new MappedFileDataInput(indexBuffers[bufferIndex],
indexFilename(), BUFFER_SIZE * bufferIndex, (int)(indexPosition % BUFFER_SIZE));
+ }
+
private PositionSize getDataPositionSize(FileDataInput input, long
dataPosition) throws IOException
{
// if we've reached the end of the index, then the row size is "the
rest of the data file"
@@ -392,7 +418,7 @@ public class SSTableReader extends SSTab
// otherwise, row size is the start of the next row (in next index
entry), minus the start of this one.
long nextIndexPosition = input.getAbsolutePosition();
// if next index entry would span mmap boundary, get the next row
position from the summary instead
- PositionSize nextPositionSize =
indexSummary.getSpannedPosition(nextIndexPosition);
+ PositionSize nextPositionSize =
indexSummary.getSpannedDataPosition(nextIndexPosition);
if (nextPositionSize != null)
return new PositionSize(dataPosition, nextPositionSize.position -
dataPosition);
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftValidation.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=933465&r1=933464&r2=933465&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftValidation.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftValidation.java
Tue Apr 13 02:25:56 2010
@@ -36,6 +36,7 @@ import org.apache.cassandra.dht.IPartiti
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
public class ThriftValidation
{
@@ -46,18 +47,7 @@ public class ThriftValidation
throw new InvalidRequestException("Key may not be empty");
}
// check that writeUTF will be able to handle it -- encoded length
must fit in 2 bytes
- int strlen = key.length();
- int utflen = 0;
- for (int i = 0; i < strlen; i++)
- {
- int c = key.charAt(i);
- if ((c >= 0x0001) && (c <= 0x007F))
- utflen++;
- else if (c > 0x07FF)
- utflen += 3;
- else
- utflen += 2;
- }
+ int utflen = FBUtilities.encodedUTF8Length(key);
if (utflen > 65535)
throw new InvalidRequestException("Encoded key length of " +
utflen + " is longer than maximum of 65535");
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=933465&r1=933464&r2=933465&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
Tue Apr 13 02:25:56 2010
@@ -463,4 +463,21 @@ public class FBUtilities
else
return a.equals(b);
}
+
+ public static int encodedUTF8Length(String st)
+ {
+ int strlen = st.length();
+ int utflen = 0;
+ for (int i = 0; i < strlen; i++)
+ {
+ int c = st.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F))
+ utflen++;
+ else if (c > 0x07FF)
+ utflen += 3;
+ else
+ utflen += 2;
+ }
+ return utflen;
+ }
}
Added:
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableReaderTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableReaderTest.java?rev=933465&view=auto
==============================================================================
---
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableReaderTest.java
(added)
+++
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableReaderTest.java
Tue Apr 13 02:25:56 2010
@@ -0,0 +1,58 @@
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.service.StorageService;
+
+
+public class SSTableReaderTest
+{
+ @Test
+ public void testSpannedIndexPositions() throws IOException,
ExecutionException, InterruptedException
+ {
+ SSTableReader.BUFFER_SIZE = 40;
+
+ Table table = Table.open("Keyspace1");
+ ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+
+ // insert a bunch of data
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 100; j += 2)
+ {
+ String key = String.valueOf(j);
+ RowMutation rm = new RowMutation("Keyspace1", key);
+ rm.add(new QueryPath("Standard1", null, "0".getBytes()), new
byte[0], j);
+ rm.apply();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.submitMajor(store).get();
+
+ // check that all our keys are found correctly
+ SSTableReader sstable = store.getSSTables().iterator().next();
+ for (int j = 0; j < 100; j += 2)
+ {
+ String key = String.valueOf(j);
+ DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+ FileDataInput file = sstable.getFileDataInput(dk,
DatabaseDescriptor.getIndexedReadBufferSizeInKB() * 1024);
+ DecoratedKey keyInDisk =
sstable.getPartitioner().convertFromDiskFormat(file.readUTF());
+ assert keyInDisk.equals(dk) : String.format("%s != %s in %s",
keyInDisk, dk, file.getPath());
+ }
+
+ // check no false positives
+ for (int j = 1; j < 110; j += 2)
+ {
+ String key = String.valueOf(j);
+ DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+ assert sstable.getPosition(dk) == null;
+ }
+ }
+}