Updated Branches: refs/heads/trunk 25f30559b -> c33ccd9e3
move IndexSummary off heap patch by Vijay; reviewed by jbellis for CASSANDRA-5521 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c33ccd9e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c33ccd9e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c33ccd9e Branch: refs/heads/trunk Commit: c33ccd9e38cae018022bec16e66489428bfc8912 Parents: 25f3055 Author: Vijay Parthasarathy <[email protected]> Authored: Fri May 3 01:13:44 2013 -0700 Committer: Vijay Parthasarathy <[email protected]> Committed: Fri May 3 01:13:44 2013 -0700 ---------------------------------------------------------------------- src/java/org/apache/cassandra/db/DecoratedKey.java | 11 ++ .../db/compaction/AbstractCompactionStrategy.java | 2 +- .../apache/cassandra/io/sstable/Descriptor.java | 2 + .../apache/cassandra/io/sstable/IndexSummary.java | 94 ++++++++------- .../cassandra/io/sstable/IndexSummaryBuilder.java | 35 ++++-- .../apache/cassandra/io/sstable/SSTableReader.java | 14 ++- src/java/org/apache/cassandra/io/util/Memory.java | 12 ++ .../cassandra/io/util/MemoryInputStream.java | 7 +- .../org/apache/cassandra/utils/FBUtilities.java | 21 +++- .../cassandra/io/sstable/IndexSummaryTest.java | 87 +++++++++++++ .../cassandra/io/sstable/SSTableReaderTest.java | 5 +- 11 files changed, 224 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ccd9e/src/java/org/apache/cassandra/db/DecoratedKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DecoratedKey.java b/src/java/org/apache/cassandra/db/DecoratedKey.java index fc4bb32..8f7a22b 100644 --- a/src/java/org/apache/cassandra/db/DecoratedKey.java +++ b/src/java/org/apache/cassandra/db/DecoratedKey.java @@ -86,6 +86,17 @@ public class DecoratedKey extends RowPosition return cmp == 0 ? ByteBufferUtil.compareUnsigned(key, otherKey.key) : cmp; } + public static int compareTo(IPartitioner partitioner, ByteBuffer key, RowPosition position) + { + // delegate to Token.KeyBound if needed + if (!(position instanceof DecoratedKey)) + return -position.compareTo(partitioner.decorateKey(key)); + + DecoratedKey otherKey = (DecoratedKey) position; + int cmp = partitioner.getToken(key).compareTo(otherKey.getToken()); + return cmp == 0 ? ByteBufferUtil.compareUnsigned(key, otherKey.key) : cmp; + } + public boolean isMinimum(IPartitioner partitioner) { // A DecoratedKey can never be the minimum position on the ring http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ccd9e/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index ba89aa3..d719ad2 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -253,7 +253,7 @@ public abstract class AbstractCompactionStrategy else { // what percentage of columns do we expect to compact outside of overlap? - if (sstable.getKeySamples().length < 2) + if (sstable.getKeySampleSize() < 2) { // we have too few samples to estimate correct percentage return false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ccd9e/src/java/org/apache/cassandra/io/sstable/Descriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index 51256ff..e2a7a18 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -62,6 +62,7 @@ public class Descriptor public final boolean hasSuperColumns; public final boolean tracksMaxLocalDeletionTime; public final boolean hasBloomFilterFPChance; + public final boolean offHeapSummaries; public Version(String version) { @@ -70,6 +71,7 @@ public class Descriptor isLatestVersion = version.compareTo(current_version) == 0; hasSuperColumns = version.compareTo("ja") < 0; hasBloomFilterFPChance = version.compareTo("ja") >= 0; + offHeapSummaries = version.compareTo("ja") >= 0; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ccd9e/src/java/org/apache/cassandra/io/sstable/IndexSummary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java index f91d10c..be7977e 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java @@ -17,51 +17,45 @@ */ package org.apache.cassandra.io.sstable; -import java.io.DataInput; -import java.io.DataOutput; +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.io.util.Memory; +import org.apache.cassandra.io.util.MemoryInputStream; +import org.apache.cassandra.io.util.MemoryOutputStream; +import org.apache.cassandra.utils.FBUtilities; -public class IndexSummary +public class IndexSummary implements Closeable { public static final IndexSummarySerializer serializer = new IndexSummarySerializer(); private final int indexInterval; - private final long[] positions; - private final byte[][] keys; private final IPartitioner partitioner; + private final int summary_size; + private final Memory bytes; - public IndexSummary(IPartitioner partitioner, byte[][] keys, long[] positions, int indexInterval) + public IndexSummary(IPartitioner partitioner, Memory memory, int summary_size, int indexInterval) { this.partitioner = partitioner; this.indexInterval = indexInterval; - assert keys != null && keys.length > 0; - assert keys.length == positions.length; - - this.keys = keys; - this.positions = positions; - } - - public byte[][] getKeys() - { - return keys; + this.summary_size = summary_size; + this.bytes = memory; } // binary search is notoriously more difficult to get right than it looks; this is lifted from // Harmony's Collections implementation public int binarySearch(RowPosition key) { - int low = 0, mid = keys.length, high = mid - 1, result = -1; - + int low = 0, mid = summary_size, high = mid - 1, result = -1; while (low <= high) { mid = (low + high) >> 1; - result = -partitioner.decorateKey(ByteBuffer.wrap(keys[mid])).compareTo(key); - + result = -DecoratedKey.compareTo(partitioner, ByteBuffer.wrap(getKey(mid)), key); if (result > 0) { low = mid + 1; @@ -79,14 +73,29 @@ public class IndexSummary return -mid - (result < 0 ? 1 : 2); } + public int getIndex(int index) + { + // multiply by 4. + return bytes.getInt(index << 2); + } + public byte[] getKey(int index) { - return keys[index]; + long start = getIndex(index); + int keySize = (int) (caclculateEnd(index) - start - 8L); + byte[] key = new byte[keySize]; + bytes.getBytes(start, key, 0, keySize); + return key; } public long getPosition(int index) { - return positions[index]; + return bytes.getLong(caclculateEnd(index) - 8); + } + + private long caclculateEnd(int index) + { + return index == (summary_size - 1) ? bytes.size() : getIndex(index + 1); } public int getIndexInterval() @@ -96,36 +105,33 @@ public class IndexSummary public int size() { - return positions.length; + return summary_size; } public static class IndexSummarySerializer { - public void serialize(IndexSummary t, DataOutput out) throws IOException + public void serialize(IndexSummary t, DataOutputStream out) throws IOException { out.writeInt(t.indexInterval); - out.writeInt(t.keys.length); - for (int i = 0; i < t.keys.length; i++) - { - out.writeLong(t.getPosition(i)); - ByteBufferUtil.writeWithLength(t.keys[i], out); - } + out.writeInt(t.summary_size); + out.writeLong(t.bytes.size()); + FBUtilities.copy(new MemoryInputStream(t.bytes), out, t.bytes.size()); } - public IndexSummary deserialize(DataInput in, IPartitioner partitioner) throws IOException + public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner) throws IOException { int indexInterval = in.readInt(); - int size = in.readInt(); - long[] positions = new long[size]; - byte[][] keys = new byte[size][]; - - for (int i = 0; i < size; i++) - { - positions[i] = in.readLong(); - keys[i] = ByteBufferUtil.readBytes(in, in.readInt()); - } - - return new IndexSummary(partitioner, keys, positions, indexInterval); + int summary_size = in.readInt(); + long offheap_size = in.readLong(); + Memory memory = Memory.allocate(offheap_size); + FBUtilities.copy(in, new MemoryOutputStream(memory), offheap_size); + return new IndexSummary(partitioner, memory, summary_size, indexInterval); } } + + @Override + public void close() throws IOException + { + bytes.free(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ccd9e/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java index 7b06ee5..1fa2912 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java @@ -19,14 +19,13 @@ package org.apache.cassandra.io.sstable; import java.util.ArrayList; -import com.google.common.primitives.Longs; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.util.Memory; import org.apache.cassandra.utils.ByteBufferUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class IndexSummaryBuilder { @@ -36,6 +35,7 @@ public class IndexSummaryBuilder private final ArrayList<byte[]> keys; private final int indexInterval; private long keysWritten = 0; + private long offheapSize = 0; public IndexSummaryBuilder(long expectedKeys, int indexInterval) { @@ -58,8 +58,11 @@ public class IndexSummaryBuilder { if (keysWritten % indexInterval == 0) { - keys.add(ByteBufferUtil.getArray(decoratedKey.key)); + byte[] key = ByteBufferUtil.getArray(decoratedKey.key); + keys.add(key); + offheapSize += key.length; positions.add(indexPosition); + offheapSize += TypeSizes.NATIVE.sizeof(indexPosition); } keysWritten++; @@ -68,10 +71,24 @@ public class IndexSummaryBuilder public IndexSummary build(IPartitioner partitioner) { - byte[][] keysArray = new byte[keys.size()][]; + assert keys != null && keys.size() > 0; + assert keys.size() == positions.size(); + + Memory memory = Memory.allocate(offheapSize + (keys.size() * 4)); + int idxPosition = 0; + int keyPosition = keys.size() * 4; for (int i = 0; i < keys.size(); i++) - keysArray[i] = keys.get(i); + { + memory.setInt(idxPosition, keyPosition); + idxPosition += TypeSizes.NATIVE.sizeof(keyPosition); - return new IndexSummary(partitioner, keysArray, Longs.toArray(positions), indexInterval); + byte[] temp = keys.get(i); + memory.setBytes(keyPosition, temp, 0, temp.length); + keyPosition += temp.length; + long tempPosition = positions.get(i); + memory.setLong(keyPosition, tempPosition); + keyPosition += TypeSizes.NATIVE.sizeof(tempPosition); + } + return new IndexSummary(partitioner, memory, keys.size(), indexInterval); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ccd9e/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 33a302d..a09f65b 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -110,7 +110,7 @@ public class SSTableReader extends SSTable for (SSTableReader sstable : sstables) { - int indexKeyCount = sstable.getKeySamples().length; + int indexKeyCount = sstable.getKeySampleSize(); count = count + (indexKeyCount + 1) * metadata.getIndexInterval(); if (logger.isDebugEnabled()) logger.debug("index size for bloom filter calc for file : " + sstable.getFilename() + " : " + count); @@ -429,7 +429,7 @@ public class SSTableReader extends SSTable public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, CFMetaData metadata) { File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY)); - if (!summariesFile.exists()) + if (!reader.descriptor.version.offHeapSummaries || !summariesFile.exists()) return false; DataInputStream iStream = null; @@ -574,9 +574,14 @@ public class SSTableReader extends SSTable /** * @return Approximately 1/INDEX_INTERVALth of the keys in this SSTable. */ - public byte[][] getKeySamples() + public int getKeySampleSize() { - return indexSummary.getKeys(); + return indexSummary.size(); + } + + public byte[] getKeySample(int position) + { + return indexSummary.getKey(position); } private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges) @@ -953,6 +958,7 @@ public class SSTableReader extends SSTable deletingTask.schedule(); // close the BF so it can be opened later. FileUtils.closeQuietly(bf); + FileUtils.closeQuietly(indexSummary); } assert references.get() >= 0 : "Reference counter " + references.get() + " for " + dfile.path; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ccd9e/src/java/org/apache/cassandra/io/util/Memory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java index 9ed6981..051b427 100644 --- a/src/java/org/apache/cassandra/io/util/Memory.java +++ b/src/java/org/apache/cassandra/io/util/Memory.java @@ -66,6 +66,12 @@ public class Memory unsafe.putLong(peer + offset, l); } + public void setInt(long offset, int l) + { + checkPosition(offset); + unsafe.putInt(peer + offset, l); + } + /** * Transfers count bytes from buffer to Memory * @@ -104,6 +110,12 @@ public class Memory return unsafe.getLong(peer + offset); } + public int getInt(long offset) + { + checkPosition(offset); + return unsafe.getInt(peer + offset); + } + /** * Transfers count bytes from Memory starting at memoryOffset to buffer starting at bufferOffset * http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ccd9e/src/java/org/apache/cassandra/io/util/MemoryInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java index b7fb5ab..eee030a 100644 --- a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java +++ b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java @@ -19,15 +19,12 @@ package org.apache.cassandra.io.util; import java.io.IOException; -import org.apache.cassandra.cache.RefCountedMemory; - - public class MemoryInputStream extends AbstractDataInput { - private final RefCountedMemory mem; + private final Memory mem; private int position = 0; - public MemoryInputStream(RefCountedMemory mem) + public MemoryInputStream(Memory mem) { this.mem = mem; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ccd9e/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index e606e06..a7e2775 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -37,7 +37,6 @@ import java.util.zip.Checksum; import com.google.common.base.Joiner; import com.google.common.collect.AbstractIterator; -import com.google.common.primitives.Longs; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -625,4 +624,24 @@ public class FBUtilities throw new AssertionError(e); } } + + public static long copy(InputStream from, OutputStream to, long limit) throws IOException + { + byte[] buffer = new byte[64]; // 64 byte buffer + long copied = 0; + int toCopy = buffer.length; + while (true) + { + if (limit < buffer.length + copied) + toCopy = (int) (limit - copied); + int sofar = from.read(buffer, 0, toCopy); + if (sofar == -1) + break; + to.write(buffer, 0, sofar); + copied += sofar; + if (limit == copied) + break; + } + return copied; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ccd9e/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java new file mode 100644 index 0000000..f5c7350 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java @@ -0,0 +1,87 @@ +package org.apache.cassandra.io.sstable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import junit.framework.Assert; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class IndexSummaryTest +{ + @Test + public void testGetKey() + { + Pair<List<DecoratedKey>, IndexSummary> random = generateRandomIndex(100, 1); + for (int i = 0; i < 100; i++) + Assert.assertEquals(random.left.get(i).key, ByteBuffer.wrap(random.right.getKey(i))); + } + + @Test + public void testBinarySearch() + { + Pair<List<DecoratedKey>, IndexSummary> random = generateRandomIndex(100, 1); + for (int i = 0; i < 100; i++) + Assert.assertEquals(i, random.right.binarySearch(random.left.get(i))); + } + + @Test + public void testGetPosition() + { + Pair<List<DecoratedKey>, IndexSummary> random = generateRandomIndex(100, 2); + for (int i = 0; i < 50; i++) + Assert.assertEquals(i*2, random.right.getPosition(i)); + } + + @Test + public void testSerialization() throws IOException + { + Pair<List<DecoratedKey>, IndexSummary> random = generateRandomIndex(100, 1); + ByteArrayOutputStream aos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(aos); + IndexSummary.serializer.serialize(random.right, dos); + // write junk + dos.writeUTF("JUNK"); + dos.writeUTF("JUNK"); + FileUtils.closeQuietly(dos); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(aos.toByteArray())); + IndexSummary is = IndexSummary.serializer.deserialize(dis, DatabaseDescriptor.getPartitioner()); + for (int i = 0; i < 100; i++) + Assert.assertEquals(i, is.binarySearch(random.left.get(i))); + // read the junk + Assert.assertEquals(dis.readUTF(), "JUNK"); + Assert.assertEquals(dis.readUTF(), "JUNK"); + FileUtils.closeQuietly(dis); + } + + private Pair<List<DecoratedKey>, IndexSummary> generateRandomIndex(int size, int interval) + { + List<DecoratedKey> list = Lists.newArrayList(); + IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), interval); + for (int i = 0; i < size; i++) + { + UUID uuid = UUID.randomUUID(); + DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(uuid)); + list.add(key); + } + Collections.sort(list); + for (int i = 0; i < size; i++) + builder.maybeAddEntry(list.get(i), i); + IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner()); + return Pair.create(list, summary); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c33ccd9e/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index 637d60a..86fed06 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.*; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -255,8 +256,8 @@ public class SSTableReaderTest extends SchemaLoader // test to see if sstable can be opened as expected SSTableReader target = SSTableReader.open(desc); - byte[][] keySamples = target.getKeySamples(); - assert keySamples.length == 1 && Arrays.equals(keySamples[0], firstKey.key.array()); + Assert.assertEquals(target.getKeySampleSize(), 1); + Assert.assertArrayEquals(ByteBufferUtil.getArray(firstKey.key), target.getKeySample(0)); assert target.first.equals(firstKey); assert target.last.equals(lastKey); }
