Updated Branches: refs/heads/trunk c34ecbf68 -> dc37dea74
off-heap bloom filters for row keys patch by vijay; reviewed by jbellis for CASSANDRA-4865 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc37dea7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc37dea7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc37dea7 Branch: refs/heads/trunk Commit: dc37dea745fe89d70819d649c823d9bfcb0d7577 Parents: c34ecbf Author: Jonathan Ellis <[email protected]> Authored: Wed Oct 31 10:54:47 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Wed Oct 31 11:22:47 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/ColumnIndex.java | 2 +- .../org/apache/cassandra/db/RowIndexEntry.java | 2 +- .../apache/cassandra/io/sstable/IndexHelper.java | 2 +- .../apache/cassandra/io/sstable/SSTableReader.java | 4 +- .../apache/cassandra/io/sstable/SSTableWriter.java | 4 +- src/java/org/apache/cassandra/io/util/Memory.java | 20 ++ .../org/apache/cassandra/utils/BloomFilter.java | 25 +-- .../cassandra/utils/BloomFilterSerializer.java | 51 +---- src/java/org/apache/cassandra/utils/Filter.java | 3 +- .../org/apache/cassandra/utils/FilterFactory.java | 36 ++-- .../apache/cassandra/utils/LegacyBloomFilter.java | 6 + .../apache/cassandra/utils/Murmur2BloomFilter.java | 16 +- .../apache/cassandra/utils/Murmur3BloomFilter.java | 16 +- .../org/apache/cassandra/utils/obs/IBitSet.java | 52 +++++ .../apache/cassandra/utils/obs/OffHeapBitSet.java | 160 +++++++++++++++ .../org/apache/cassandra/utils/obs/OpenBitSet.java | 60 +++++- .../org/apache/cassandra/utils/LongBitSetTest.java | 133 ++++++++++++ .../cassandra/utils/LongBloomFilterTest.java | 6 +- .../org/apache/cassandra/utils/BitSetTest.java | 148 +++++++++++++ .../apache/cassandra/utils/BloomFilterTest.java | 9 +- .../cassandra/utils/LegacyBloomFilterTest.java | 2 +- .../apache/cassandra/utils/SerializationsTest.java | 14 +- 23 files changed, 657 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 11aaea1..7f4a728 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-beta2 + * off-heap bloom filters for row keys (CASSANDRA_4865) * add extension point for sstable components (CASSANDRA-4049) * improve tracing output (CASSANDRA-4852, 4862) * make TRACE verb droppable (CASSANDRA-4672) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index 35ee899..946c4f4 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -36,7 +36,7 @@ public class ColumnIndex private ColumnIndex(int estimatedColumnCount) { - this(new ArrayList<IndexHelper.IndexInfo>(), FilterFactory.getFilter(estimatedColumnCount, 4)); + this(new ArrayList<IndexHelper.IndexInfo>(), FilterFactory.getFilter(estimatedColumnCount, 4, false)); } private ColumnIndex(List<IndexHelper.IndexInfo> columnsIndex, Filter bloomFilter) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/db/RowIndexEntry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java index b7660e5..a3701f8 100644 --- a/src/java/org/apache/cassandra/db/RowIndexEntry.java +++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java @@ -119,7 +119,7 @@ public class RowIndexEntry List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<IndexHelper.IndexInfo>(entries); for (int i = 0; i < entries; i++) columnsIndex.add(IndexHelper.IndexInfo.deserialize(dis)); - Filter bf = FilterFactory.deserialize(dis, version.filterType); + Filter bf = FilterFactory.deserialize(dis, version.filterType, false); return new IndexedEntry(position, delInfo, columnsIndex, bf); } else http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/io/sstable/IndexHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java index 29e076a..a87ecf7 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java @@ -130,7 +130,7 @@ public class IndexHelper ByteBuffer bytes = file.readBytes(size); DataInputStream stream = new DataInputStream(ByteBufferUtil.inputStream(bytes)); - return FilterFactory.deserialize(stream, type); + return FilterFactory.deserialize(stream, type, false); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 7957134..812a475 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -330,7 +330,7 @@ public class SSTableReader extends SSTable try { stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER)))); - bf = FilterFactory.deserialize(stream, descriptor.version.filterType); + bf = FilterFactory.deserialize(stream, descriptor.version.filterType, true); } finally { @@ -899,6 +899,8 @@ public class SSTableReader extends SSTable dfile.cleanup(); deletingTask.schedule(); + // close the BF so it can be opened later. + FileUtils.closeQuietly(bf); } assert references.get() >= 0 : "Reference counter " + references.get() + " for " + dfile.path; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index c17de4c..2627a77 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -445,8 +445,8 @@ public class SSTableWriter extends SSTable logger.error("Bloom filter FP chance of zero isn't supposed to happen"); fpChance = null; } - bf = fpChance == null ? FilterFactory.getFilter(keyCount, 15) - : FilterFactory.getFilter(keyCount, fpChance); + bf = fpChance == null ? FilterFactory.getFilter(keyCount, 15, true) + : FilterFactory.getFilter(keyCount, fpChance, true); } public void append(DecoratedKey key, RowIndexEntry indexEntry) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/io/util/Memory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java index faef564..25f5caf 100644 --- a/src/java/org/apache/cassandra/io/util/Memory.java +++ b/src/java/org/apache/cassandra/io/util/Memory.java @@ -66,6 +66,13 @@ public class Memory unsafe.putByte(peer + offset, b); } + public void setMemory(long offset, long bytes, byte b) + { + // check if the last element will fit into the memory + checkPosition(offset + bytes - 1); + unsafe.setMemory(peer + offset, bytes, b); + } + /** * Transfers count bytes from buffer to Memory * @@ -139,5 +146,18 @@ public class Memory { return size; } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + if (!(o instanceof Memory)) + return false; + Memory b = (Memory) o; + if (peer == b.peer && size == b.size) + return true; + return false; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/BloomFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java index 3ca62e5..469763a 100644 --- a/src/java/org/apache/cassandra/utils/BloomFilter.java +++ b/src/java/org/apache/cassandra/utils/BloomFilter.java @@ -17,23 +17,16 @@ */ package org.apache.cassandra.utils; +import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.cassandra.utils.obs.OpenBitSet; +import org.apache.cassandra.utils.obs.IBitSet; public abstract class BloomFilter extends Filter { - private static final int EXCESS = 20; + public final IBitSet bitset; - public final OpenBitSet bitset; - - BloomFilter(int hashes, long numElements, int bucketsPer) - { - hashCount = hashes; - bitset = new OpenBitSet(numElements * bucketsPer + EXCESS); - } - - BloomFilter(int hashes, OpenBitSet bitset) + BloomFilter(int hashes, IBitSet bitset) { this.hashCount = hashes; this.bitset = bitset; @@ -41,7 +34,7 @@ public abstract class BloomFilter extends Filter private long[] getHashBuckets(ByteBuffer key) { - return getHashBuckets(key, hashCount, bitset.size()); + return getHashBuckets(key, hashCount, bitset.capacity()); } protected abstract long[] hash(ByteBuffer b, int position, int remaining, long seed); @@ -84,6 +77,12 @@ public abstract class BloomFilter extends Filter public void clear() { - bitset.clear(0, bitset.size()); + bitset.clear(); + } + + @Override + public void close() throws IOException + { + bitset.close(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java index 68997c9..6b8b355 100644 --- a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java +++ b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java @@ -23,46 +23,31 @@ import java.io.IOException; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.utils.obs.IBitSet; +import org.apache.cassandra.utils.obs.OffHeapBitSet; import org.apache.cassandra.utils.obs.OpenBitSet; abstract class BloomFilterSerializer implements ISerializer<BloomFilter> { public void serialize(BloomFilter bf, DataOutput dos) throws IOException { - int bitLength = bf.bitset.getNumWords(); - int pageSize = bf.bitset.getPageSize(); - int pageCount = bf.bitset.getPageCount(); - dos.writeInt(bf.getHashCount()); - dos.writeInt(bitLength); - - for (int p = 0; p < pageCount; p++) - { - long[] bits = bf.bitset.getPage(p); - for (int i = 0; i < pageSize && bitLength-- > 0; i++) - dos.writeLong(bits[i]); - } + bf.bitset.serialize(dos); } public BloomFilter deserialize(DataInput dis) throws IOException { - int hashes = dis.readInt(); - long bitLength = dis.readInt(); - OpenBitSet bs = new OpenBitSet(bitLength << 6); - int pageSize = bs.getPageSize(); - int pageCount = bs.getPageCount(); - - for (int p = 0; p < pageCount; p++) - { - long[] bits = bs.getPage(p); - for (int i = 0; i < pageSize && bitLength-- > 0; i++) - bits[i] = dis.readLong(); - } + return deserialize(dis, false); + } + public BloomFilter deserialize(DataInput dis, boolean offheap) throws IOException + { + int hashes = dis.readInt(); + IBitSet bs = offheap ? OffHeapBitSet.deserialize(dis) : OpenBitSet.deserialize(dis); return createFilter(hashes, bs); } - protected abstract BloomFilter createFilter(int hashes, OpenBitSet bs); + protected abstract BloomFilter createFilter(int hashes, IBitSet bs); /** * Calculates a serialized size of the given Bloom Filter @@ -74,20 +59,8 @@ abstract class BloomFilterSerializer implements ISerializer<BloomFilter> */ public long serializedSize(BloomFilter bf, TypeSizes typeSizes) { - int bitLength = bf.bitset.getNumWords(); - int pageSize = bf.bitset.getPageSize(); - int pageCount = bf.bitset.getPageCount(); - - int size = 0; - size += typeSizes.sizeof(bf.getHashCount()); // hash count - size += typeSizes.sizeof(bitLength); // length - - for (int p = 0; p < pageCount; p++) - { - long[] bits = bf.bitset.getPage(p); - for (int i = 0; i < pageSize && bitLength-- > 0; i++) - size += typeSizes.sizeof(bits[i]); // bucket - } + int size = typeSizes.sizeof(bf.getHashCount()); // hash count + size += bf.bitset.serializedSize(typeSizes); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/Filter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Filter.java b/src/java/org/apache/cassandra/utils/Filter.java index f7ce1f3..ea98401 100644 --- a/src/java/org/apache/cassandra/utils/Filter.java +++ b/src/java/org/apache/cassandra/utils/Filter.java @@ -17,9 +17,10 @@ */ package org.apache.cassandra.utils; +import java.io.Closeable; import java.nio.ByteBuffer; -public abstract class Filter +public abstract class Filter implements Closeable { int hashCount; http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/FilterFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FilterFactory.java b/src/java/org/apache/cassandra/utils/FilterFactory.java index 2c9bcf4..3eae519 100644 --- a/src/java/org/apache/cassandra/utils/FilterFactory.java +++ b/src/java/org/apache/cassandra/utils/FilterFactory.java @@ -22,6 +22,9 @@ import java.io.DataOutput; import java.io.IOException; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.utils.obs.IBitSet; +import org.apache.cassandra.utils.obs.OffHeapBitSet; +import org.apache.cassandra.utils.obs.OpenBitSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +33,7 @@ public class FilterFactory { private static final Logger logger = LoggerFactory.getLogger(FilterFactory.class); private static final TypeSizes TYPE_SIZES = TypeSizes.NATIVE; + private static final long BITSET_EXCESS = 20; public enum Type { @@ -57,16 +61,16 @@ public class FilterFactory } } - public static Filter deserialize(DataInput input, Type type) throws IOException + public static Filter deserialize(DataInput input, Type type, boolean offheap) throws IOException { switch (type) { case SHA: return LegacyBloomFilter.serializer.deserialize(input); case MURMUR2: - return Murmur2BloomFilter.serializer.deserialize(input); + return Murmur2BloomFilter.serializer.deserialize(input, offheap); default: - return Murmur3BloomFilter.serializer.deserialize(input); + return Murmur3BloomFilter.serializer.deserialize(input, offheap); } } @@ -92,13 +96,13 @@ public class FilterFactory * @return A BloomFilter with the lowest practical false positive * probability for the given number of elements. */ - public static Filter getFilter(long numElements, int targetBucketsPerElem) + public static Filter getFilter(long numElements, int targetBucketsPerElem, boolean offheap) { - return getFilter(numElements, targetBucketsPerElem, Type.MURMUR3); + return getFilter(numElements, targetBucketsPerElem, Type.MURMUR3, offheap); } // helper method for test. - static Filter getFilter(long numElements, int targetBucketsPerElem, Type type) + static Filter getFilter(long numElements, int targetBucketsPerElem, Type type, boolean offheap) { int maxBucketsPerElement = Math.max(1, BloomCalculations.maxBucketsPerElement(numElements)); int bucketsPerElement = Math.min(targetBucketsPerElem, maxBucketsPerElement); @@ -107,7 +111,7 @@ public class FilterFactory logger.warn(String.format("Cannot provide an optimal BloomFilter for %d elements (%d/%d buckets per element).", numElements, bucketsPerElement, targetBucketsPerElem)); } BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(bucketsPerElement); - return createFilter(spec.K, numElements, spec.bucketsPerElement, type); + return createFilter(spec.K, numElements, spec.bucketsPerElement, type, offheap); } /** @@ -117,33 +121,35 @@ public class FilterFactory * Asserts that the given probability can be satisfied using this * filter. */ - public static Filter getFilter(long numElements, double maxFalsePosProbability) + public static Filter getFilter(long numElements, double maxFalsePosProbability, boolean offheap) { - return getFilter(numElements, maxFalsePosProbability, Type.MURMUR3); + return getFilter(numElements, maxFalsePosProbability, Type.MURMUR3, offheap); } // helper method for test. - static Filter getFilter(long numElements, double maxFalsePosProbability, Type type) + static Filter getFilter(long numElements, double maxFalsePosProbability, Type type, boolean offheap) { assert maxFalsePosProbability <= 1.0 : "Invalid probability"; int bucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements); BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(bucketsPerElement, maxFalsePosProbability); - return createFilter(spec.K, numElements, spec.bucketsPerElement, type); + return createFilter(spec.K, numElements, spec.bucketsPerElement, type, offheap); } - private static Filter createFilter(int hash, long numElements, int bucketsPer, Type type) + private static Filter createFilter(int hash, long numElements, int bucketsPer, Type type, boolean offheap) { + long numBits = (numElements * bucketsPer) + BITSET_EXCESS; + IBitSet bitset = offheap ? new OffHeapBitSet(numBits) : new OpenBitSet(numBits); switch (type) { case MURMUR2: - return new Murmur2BloomFilter(hash, numElements, bucketsPer); + return new Murmur2BloomFilter(hash, bitset); default: - return new Murmur3BloomFilter(hash, numElements, bucketsPer); + return new Murmur3BloomFilter(hash, bitset); } } public static BloomFilter emptyFilter() { - return new Murmur3BloomFilter(0, 0, 0); + return new Murmur3BloomFilter(0, new OpenBitSet(BITSET_EXCESS)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/LegacyBloomFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/LegacyBloomFilter.java b/src/java/org/apache/cassandra/utils/LegacyBloomFilter.java index 6f7269e..a50e2c8 100644 --- a/src/java/org/apache/cassandra/utils/LegacyBloomFilter.java +++ b/src/java/org/apache/cassandra/utils/LegacyBloomFilter.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.utils; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.BitSet; @@ -160,4 +161,9 @@ public class LegacyBloomFilter extends Filter public BitSet getBitSet(){ return filter; } + + public void close() throws IOException + { + // Do nothing for this + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/Murmur2BloomFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Murmur2BloomFilter.java b/src/java/org/apache/cassandra/utils/Murmur2BloomFilter.java index df5a160..1c20dd1 100644 --- a/src/java/org/apache/cassandra/utils/Murmur2BloomFilter.java +++ b/src/java/org/apache/cassandra/utils/Murmur2BloomFilter.java @@ -19,19 +19,13 @@ package org.apache.cassandra.utils; import java.nio.ByteBuffer; -import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.utils.obs.OpenBitSet; +import org.apache.cassandra.utils.obs.IBitSet; public class Murmur2BloomFilter extends BloomFilter { - public static final ISerializer<BloomFilter> serializer = new Murmur2BloomFilterSerializer(); + public static final Murmur2BloomFilterSerializer serializer = new Murmur2BloomFilterSerializer(); - Murmur2BloomFilter(int hashes, long numElements, int bucketsPer) - { - super(hashes, numElements, bucketsPer); - } - - private Murmur2BloomFilter(int hashes, OpenBitSet bs) + public Murmur2BloomFilter(int hashes, IBitSet bs) { super(hashes, bs); } @@ -43,9 +37,9 @@ public class Murmur2BloomFilter extends BloomFilter return (new long[] { hash1, hash2 }); } - private static class Murmur2BloomFilterSerializer extends BloomFilterSerializer + public static class Murmur2BloomFilterSerializer extends BloomFilterSerializer { - protected BloomFilter createFilter(int hashes, OpenBitSet bs) + protected BloomFilter createFilter(int hashes, IBitSet bs) { return new Murmur2BloomFilter(hashes, bs); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java b/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java index 304842a..ebd506c 100644 --- a/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java +++ b/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java @@ -19,19 +19,13 @@ package org.apache.cassandra.utils; import java.nio.ByteBuffer; -import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.utils.obs.OpenBitSet; +import org.apache.cassandra.utils.obs.IBitSet; public class Murmur3BloomFilter extends BloomFilter { - public static final ISerializer<BloomFilter> serializer = new Murmur3BloomFilterSerializer(); + public static final Murmur3BloomFilterSerializer serializer = new Murmur3BloomFilterSerializer(); - Murmur3BloomFilter(int hashes, long numElements, int bucketsPer) - { - super(hashes, numElements, bucketsPer); - } - - private Murmur3BloomFilter(int hashes, OpenBitSet bs) + public Murmur3BloomFilter(int hashes, IBitSet bs) { super(hashes, bs); } @@ -41,9 +35,9 @@ public class Murmur3BloomFilter extends BloomFilter return MurmurHash.hash3_x64_128(b, b.position(), b.remaining(), seed); } - private static class Murmur3BloomFilterSerializer extends BloomFilterSerializer + public static class Murmur3BloomFilterSerializer extends BloomFilterSerializer { - protected BloomFilter createFilter(int hashes, OpenBitSet bs) + protected BloomFilter createFilter(int hashes, IBitSet bs) { return new Murmur3BloomFilter(hashes, bs); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/obs/IBitSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java new file mode 100644 index 0000000..c5a2fb8 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.obs; + +import java.io.Closeable; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.cassandra.db.TypeSizes; + +public interface IBitSet extends Closeable +{ + public long capacity(); + + /** + * Returns true or false for the specified bit index. The index should be + * less than the capacity. + */ + public boolean get(long index); + + /** + * Sets the bit at the specified index. The index should be less than the + * capacity. + */ + public void set(long index); + + /** + * clears the bit. The index should be less than the capacity. + */ + public void clear(long index); + + public void serialize(DataOutput dos) throws IOException; + + public long serializedSize(TypeSizes type); + + public void clear(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java new file mode 100644 index 0000000..1733a81 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.obs; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.cassandra.cache.RefCountedMemory; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.Memory; + +/** + * Off-heap bitset, + * file compatible with OpeBitSet + */ +public class OffHeapBitSet implements IBitSet +{ + private final Memory bytes; + + public OffHeapBitSet(long numBits) + { + // OpenBitSet.bits2words calculation is there for backward compatibility. + int byteCount = OpenBitSet.bits2words(numBits) * 8; + bytes = RefCountedMemory.allocate(byteCount); + // flush/clear the existing memory. + clear(); + } + + private OffHeapBitSet(Memory bytes) + { + this.bytes = bytes; + } + + public long capacity() + { + return bytes.size() * 8; + } + + public boolean get(long index) + { + long i = index >> 3; + long bit = index & 0x7; + int bitmask = 0x1 << bit; + return ((bytes.getByte(i) & 0xFF) & bitmask) != 0; + } + + public void set(long index) + { + long i = index >> 3; + long bit = index & 0x7; + int bitmask = 0x1 << bit; + bytes.setByte(i, (byte) (bitmask | bytes.getByte(i))); + } + + public void set(long offset, byte b) + { + bytes.setByte(offset, b); + } + + public void clear(long index) + { + long i = index >> 3; + long bit = index & 0x7; + int bitmask = 0x1 << bit; + int nativeByte = (bytes.getByte(i) & 0xFF); + nativeByte &= ~bitmask; + bytes.setByte(i, (byte) nativeByte); + } + + public void clear() + { + bytes.setMemory(0, bytes.size(), (byte) 0); + } + + public void serialize(DataOutput dos) throws IOException + { + dos.writeInt((int) (bytes.size() / 8)); + for (long i = 0; i < bytes.size();) + { + long value = ((bytes.getByte(i++) & 0xff) << 0) + + ((bytes.getByte(i++) & 0xff) << 8) + + ((bytes.getByte(i++) & 0xff) << 16) + + ((long) (bytes.getByte(i++) & 0xff) << 24) + + ((long) (bytes.getByte(i++) & 0xff) << 32) + + ((long) (bytes.getByte(i++) & 0xff) << 40) + + ((long) (bytes.getByte(i++) & 0xff) << 48) + + ((long) bytes.getByte(i++) << 56); + dos.writeLong(value); + } + } + + public long serializedSize(TypeSizes type) + { + return type.sizeof((int) bytes.size()) + bytes.size(); + } + + public static OffHeapBitSet deserialize(DataInput dis) throws IOException + { + int byteCount = dis.readInt() * 8; + Memory memory = RefCountedMemory.allocate(byteCount); + for (int i = 0; i < byteCount;) + { + long v = dis.readLong(); + memory.setByte(i++, (byte) (v >>> 0)); + memory.setByte(i++, (byte) (v >>> 8)); + memory.setByte(i++, (byte) (v >>> 16)); + memory.setByte(i++, (byte) (v >>> 24)); + memory.setByte(i++, (byte) (v >>> 32)); + memory.setByte(i++, (byte) (v >>> 40)); + memory.setByte(i++, (byte) (v >>> 48)); + memory.setByte(i++, (byte) (v >>> 56)); + } + return new OffHeapBitSet(memory); + } + + public void close() throws IOException + { + bytes.free(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + if (!(o instanceof OffHeapBitSet)) + return false; + OffHeapBitSet b = (OffHeapBitSet) o; + return bytes.equals(b.bytes); + } + + @Override + public int hashCode() + { + // Similar to open bitset. + long h = 0; + for (long i = bytes.size(); --i >= 0;) + { + h ^= bytes.getByte(i); + h = (h << 1) | (h >>> 63); // rotate left + } + return (int) ((h >> 32) ^ h) + 0x98761234; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java index 1ddbe8f..4fce3f8 100644 --- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java +++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java @@ -18,8 +18,11 @@ package org.apache.cassandra.utils.obs; import java.util.Arrays; -import java.io.Serializable; -import java.util.BitSet; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.cassandra.db.TypeSizes; /** * An "open" BitSet implementation that allows direct access to the arrays of words @@ -43,7 +46,8 @@ import java.util.BitSet; * class, use <code>java.util.BitSet</code>. */ -public class OpenBitSet implements Serializable { +public class OpenBitSet implements IBitSet +{ /** * We break the bitset up into multiple arrays to avoid promotion failure caused by attempting to allocate * large, contiguous arrays (CASSANDRA-2466). All sub-arrays but the last are uniformly PAGE_SIZE words; @@ -302,7 +306,7 @@ public class OpenBitSet implements Serializable { int newLen= Math.min(this.wlen,other.wlen); long[][] thisArr = this.bits; long[][] otherArr = other.bits; - int thisPageSize = this.PAGE_SIZE; + int thisPageSize = PAGE_SIZE; int otherPageSize = other.PAGE_SIZE; // testing against zero can be more efficient int pos=newLen; @@ -383,6 +387,54 @@ public class OpenBitSet implements Serializable { return (int)((h>>32) ^ h) + 0x98761234; } + public void close() throws IOException { + // noop, let GC do the cleanup. + } + + public void serialize(DataOutput dos) throws IOException { + int bitLength = getNumWords(); + int pageSize = getPageSize(); + int pageCount = getPageCount(); + + dos.writeInt(bitLength); + for (int p = 0; p < pageCount; p++) { + long[] bits = getPage(p); + for (int i = 0; i < pageSize && bitLength-- > 0; i++) { + dos.writeLong(bits[i]); + } + } } + public long serializedSize(TypeSizes type) { + int bitLength = getNumWords(); + int pageSize = getPageSize(); + int pageCount = getPageCount(); + + long size = type.sizeof(bitLength); // length + for (int p = 0; p < pageCount; p++) { + long[] bits = getPage(p); + for (int i = 0; i < pageSize && bitLength-- > 0; i++) + size += type.sizeof(bits[i]); // bucket + } + return size; + } + + public void clear() { + clear(0, capacity()); + } + + public static OpenBitSet deserialize(DataInput dis) throws IOException { + long bitLength = dis.readInt(); + + OpenBitSet bs = new OpenBitSet(bitLength << 6); + int pageSize = bs.getPageSize(); + int pageCount = bs.getPageCount(); + for (int p = 0; p < pageCount; p++) { + long[] bits = bs.getPage(p); + for (int i = 0; i < pageSize && bitLength-- > 0; i++) + bits[i] = dis.readLong(); + } + return bs; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/long/org/apache/cassandra/utils/LongBitSetTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/utils/LongBitSetTest.java b/test/long/org/apache/cassandra/utils/LongBitSetTest.java new file mode 100644 index 0000000..7941fae --- /dev/null +++ b/test/long/org/apache/cassandra/utils/LongBitSetTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.util.Random; + +import junit.framework.Assert; + +import org.apache.cassandra.utils.obs.OffHeapBitSet; +import org.apache.cassandra.utils.obs.OpenBitSet; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LongBitSetTest +{ + private static final Logger logger = LoggerFactory.getLogger(LongBitSetTest.class); + private static final Random random = new Random(); + + public void populateRandom(OffHeapBitSet offbs, OpenBitSet obs, long index) + { + if (random.nextBoolean()) + { + offbs.set(index); + obs.set(index); + } + } + + public void compare(OffHeapBitSet offbs, OpenBitSet obs, long index) + { + if (offbs.get(index) != obs.get(index)) + throw new RuntimeException(); + Assert.assertEquals(offbs.get(index), obs.get(index)); + } + + @Test + public void testBitSetOperations() + { + long size_to_test = Integer.MAX_VALUE / 40; + long size_and_excess = size_to_test + 20; + OffHeapBitSet offbs = new OffHeapBitSet(size_and_excess); + OpenBitSet obs = new OpenBitSet(size_and_excess); + for (long i = 0; i < size_to_test; i++) + populateRandom(offbs, obs, i); + + for (long i = 0; i < size_to_test; i++) + compare(offbs, obs, i); + } + + @Test + public void timeit() + { + long size_to_test = Integer.MAX_VALUE / 10; // about 214 million + long size_and_excess = size_to_test + 20; + + OpenBitSet obs = new OpenBitSet(size_and_excess); + OffHeapBitSet offbs = new OffHeapBitSet(size_and_excess); + logger.info("||Open BS set's|Open BS get's|Open BS clear's|Offheap BS set's|Offheap BS get's|Offheap BS clear's|"); + // System.out.println("||Open BS set's|Open BS get's|Open BS clear's|Offheap BS set's|Offheap BS get's|Offheap BS clear's|"); + loopOnce(obs, offbs, size_to_test); + } + + public void loopOnce(OpenBitSet obs, OffHeapBitSet offbs, long size_to_test) + { + StringBuffer buffer = new StringBuffer(); + // start off fresh. + System.gc(); + long start = System.currentTimeMillis(); + for (long i = 0; i < size_to_test; i++) + obs.set(i); + buffer.append("||").append(System.currentTimeMillis() - start); + + start = System.currentTimeMillis(); + for (long i = 0; i < size_to_test; i++) + obs.get(i); + buffer.append("|").append(System.currentTimeMillis() - start); + + start = System.currentTimeMillis(); + for (long i = 0; i < size_to_test; i++) + obs.clear(i); + buffer.append("|").append(System.currentTimeMillis() - start); + + System.gc(); + start = System.currentTimeMillis(); + for (long i = 0; i < size_to_test; i++) + offbs.set(i); + buffer.append("|").append(System.currentTimeMillis() - start); + + start = System.currentTimeMillis(); + for (long i = 0; i < size_to_test; i++) + offbs.get(i); + + buffer.append("|").append(System.currentTimeMillis() - start); + start = System.currentTimeMillis(); + for (long i = 0; i < size_to_test; i++) + offbs.clear(i); + buffer.append("|").append(System.currentTimeMillis() - start).append("|"); + logger.info(buffer.toString()); + // System.out.println(buffer.toString()); + } + + /** + * Just to make sure JIT doesn't come on our way + */ + @Test + // @Ignore + public void loopIt() + { + long size_to_test = Integer.MAX_VALUE / 10; // about 214 million + long size_and_excess = size_to_test + 20; + + OpenBitSet obs = new OpenBitSet(size_and_excess); + OffHeapBitSet offbs = new OffHeapBitSet(size_and_excess); + for (int i = 0; i < 10; i++) + // 10 times to do approx 2B keys each. + loopOnce(obs, offbs, size_to_test); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/long/org/apache/cassandra/utils/LongBloomFilterTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/utils/LongBloomFilterTest.java b/test/long/org/apache/cassandra/utils/LongBloomFilterTest.java index d4a4c34..06ad642 100644 --- a/test/long/org/apache/cassandra/utils/LongBloomFilterTest.java +++ b/test/long/org/apache/cassandra/utils/LongBloomFilterTest.java @@ -34,7 +34,7 @@ public class LongBloomFilterTest public void testBigInt(FilterFactory.Type type) { int size = 10 * 1000 * 1000; - Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type); + Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type, false); double fp = FilterTestHelper.testFalsePositives(bf, new KeyGenerator.IntGenerator(size), new KeyGenerator.IntGenerator(size, size * 2)); logger.info("Bloom filter false positive: {}", fp); @@ -43,7 +43,7 @@ public class LongBloomFilterTest public void testBigRandom(FilterFactory.Type type) { int size = 10 * 1000 * 1000; - Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type); + Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type, false); double fp = FilterTestHelper.testFalsePositives(bf, new KeyGenerator.RandomStringGenerator(new Random().nextInt(), size), new KeyGenerator.RandomStringGenerator(new Random().nextInt(), size)); logger.info("Bloom filter false positive: {}", fp); @@ -52,7 +52,7 @@ public class LongBloomFilterTest public void timeit(FilterFactory.Type type) { int size = 300 * FilterTestHelper.ELEMENTS; - Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type); + Filter bf = FilterFactory.getFilter(size, FilterTestHelper.spec.bucketsPerElement, type, false); double sumfp = 0; for (int i = 0; i < 10; i++) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/unit/org/apache/cassandra/utils/BitSetTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/BitSetTest.java b/test/unit/org/apache/cassandra/utils/BitSetTest.java new file mode 100644 index 0000000..9684131 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/BitSetTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Random; + +import junit.framework.Assert; + +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.utils.KeyGenerator.WordGenerator; +import org.apache.cassandra.utils.obs.IBitSet; +import org.apache.cassandra.utils.obs.OffHeapBitSet; +import org.apache.cassandra.utils.obs.OpenBitSet; +import org.junit.Test; + +import com.google.common.collect.Lists; + +import static junit.framework.Assert.assertEquals; + +public class BitSetTest +{ + /** + * Test bitsets in a "real-world" environment, i.e., bloom filters + */ + @Test + public void compareBitSets() + { + BloomFilter bf2 = (BloomFilter) FilterFactory.getFilter(KeyGenerator.WordGenerator.WORDS / 2, FilterTestHelper.MAX_FAILURE_RATE, false); + BloomFilter bf3 = (BloomFilter) FilterFactory.getFilter(KeyGenerator.WordGenerator.WORDS / 2, FilterTestHelper.MAX_FAILURE_RATE, true); + int skipEven = KeyGenerator.WordGenerator.WORDS % 2 == 0 ? 0 : 2; + WordGenerator gen1 = new KeyGenerator.WordGenerator(skipEven, 2); + + // make sure both bitsets are empty. + compare(bf2.bitset, bf3.bitset); + + while (gen1.hasNext()) + { + ByteBuffer key = gen1.next(); + bf2.add(key); + bf3.add(key); + } + + compare(bf2.bitset, bf3.bitset); + } + + private static final String LEGACY_SST_FILE = "test/data/legacy-sstables/hb/Keyspace1/Keyspace1-Standard1-hb-0-Filter.db"; + + /** + * Test compatibility with a 1.1-version data file + */ + @Test + public void testExpectedCompatablity() throws IOException + { + DataInputStream dis = new DataInputStream(new FileInputStream(new File(LEGACY_SST_FILE))); + dis.readInt(); // bloom filter hash count + OpenBitSet bs = OpenBitSet.deserialize(dis); + + dis = new DataInputStream(new FileInputStream(new File(LEGACY_SST_FILE))); + dis.readInt(); // bloom filter hash count + OffHeapBitSet obs = OffHeapBitSet.deserialize(dis); + + compare(obs, bs); + } + + private static final Random random = new Random(); + + /** + * Test serialization and de-serialization in-memory + */ + @Test + public void testOffHeapSerialization() throws IOException + { + OffHeapBitSet bs = new OffHeapBitSet(100000); + populateAndReserialize(bs); + } + + @Test + public void testOffHeapCompatibility() throws IOException + { + OpenBitSet bs = new OpenBitSet(100000); + populateAndReserialize(bs); + } + + private void populateAndReserialize(IBitSet bs) throws IOException + { + for (long i = 0; i < bs.capacity(); i++) + if (random.nextBoolean()) + bs.set(i); + + DataOutputBuffer dos = new DataOutputBuffer(); + bs.serialize(dos); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dos.getData())); + OffHeapBitSet newbs = OffHeapBitSet.deserialize(dis); + compare(bs, newbs); + } + + private void compare(IBitSet bs, IBitSet newbs) + { + assertEquals(bs.capacity(), newbs.capacity()); + for (long i = 0; i < bs.capacity(); i++) + Assert.assertEquals(bs.get(i), newbs.get(i)); + } + + @Test + public void testBitClear() throws IOException + { + int size = Integer.MAX_VALUE / 4000; + OffHeapBitSet bitset = new OffHeapBitSet(size); + List<Integer> randomBits = Lists.newArrayList(); + for (int i = 0; i < 10; i++) + randomBits.add(random.nextInt(size)); + + for (long randomBit : randomBits) + bitset.set(randomBit); + + for (long randomBit : randomBits) + Assert.assertEquals(true, bitset.get(randomBit)); + + for (long randomBit : randomBits) + bitset.clear(randomBit); + + for (long randomBit : randomBits) + Assert.assertEquals(false, bitset.get(randomBit)); + bitset.close(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/unit/org/apache/cassandra/utils/BloomFilterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java index d8f596f..292bca6 100644 --- a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java +++ b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java @@ -27,6 +27,7 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.utils.KeyGenerator.WordGenerator; import org.junit.Before; import org.junit.Test; @@ -37,7 +38,7 @@ public class BloomFilterTest public BloomFilterTest() { - bf = FilterFactory.getFilter(10000L, FilterTestHelper.MAX_FAILURE_RATE); + bf = FilterFactory.getFilter(10000L, FilterTestHelper.MAX_FAILURE_RATE, true); } public static Filter testSerialize(Filter f) throws IOException @@ -47,7 +48,7 @@ public class BloomFilterTest FilterFactory.serialize(f, out, FilterFactory.Type.MURMUR3); ByteArrayInputStream in = new ByteArrayInputStream(out.getData(), 0, out.getLength()); - Filter f2 = FilterFactory.deserialize(new DataInputStream(in), FilterFactory.Type.MURMUR3); + Filter f2 = FilterFactory.deserialize(new DataInputStream(in), FilterFactory.Type.MURMUR3, true); assert f2.isPresent(ByteBufferUtil.bytes("a")); assert !f2.isPresent(ByteBufferUtil.bytes("b")); @@ -101,7 +102,7 @@ public class BloomFilterTest { return; } - Filter bf2 = FilterFactory.getFilter(KeyGenerator.WordGenerator.WORDS / 2, FilterTestHelper.MAX_FAILURE_RATE); + Filter bf2 = FilterFactory.getFilter(KeyGenerator.WordGenerator.WORDS / 2, FilterTestHelper.MAX_FAILURE_RATE, true); int skipEven = KeyGenerator.WordGenerator.WORDS % 2 == 0 ? 0 : 2; FilterTestHelper.testFalsePositives(bf2, new KeyGenerator.WordGenerator(skipEven, 2), @@ -123,7 +124,7 @@ public class BloomFilterTest { hashes.clear(); ByteBuffer buf = keys.next(); - BloomFilter bf = (BloomFilter) FilterFactory.getFilter(10, 10); + BloomFilter bf = (BloomFilter) FilterFactory.getFilter(10, 1, false); for (long hashIndex : bf.getHashBuckets(buf, MAX_HASH_COUNT, 1024 * 1024)) { hashes.add(hashIndex); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java b/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java index 248f325..d92315b 100644 --- a/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java +++ b/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java @@ -46,7 +46,7 @@ public class LegacyBloomFilterTest FilterFactory.serialize(f, out, FilterFactory.Type.SHA); ByteArrayInputStream in = new ByteArrayInputStream(out.getData(), 0, out.getLength()); - LegacyBloomFilter f2 = (LegacyBloomFilter) FilterFactory.deserialize(new DataInputStream(in), FilterFactory.Type.SHA); + LegacyBloomFilter f2 = (LegacyBloomFilter) FilterFactory.deserialize(new DataInputStream(in), FilterFactory.Type.SHA, false); assert f2.isPresent(ByteBufferUtil.bytes("a")); assert !f2.isPresent(ByteBufferUtil.bytes("b")); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc37dea7/test/unit/org/apache/cassandra/utils/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/SerializationsTest.java b/test/unit/org/apache/cassandra/utils/SerializationsTest.java index 08053df..8d5e88a 100644 --- a/test/unit/org/apache/cassandra/utils/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/utils/SerializationsTest.java @@ -31,9 +31,9 @@ import java.nio.ByteBuffer; public class SerializationsTest extends AbstractSerializationsTester { - private void testBloomFilterWrite(Type murmur) throws IOException + private void testBloomFilterWrite(Type murmur, boolean offheap) throws IOException { - Filter bf = FilterFactory.getFilter(1000000, 0.0001, murmur); + Filter bf = FilterFactory.getFilter(1000000, 0.0001, murmur, offheap); for (int i = 0; i < 100; i++) bf.add(StorageService.getPartitioner().getTokenFactory().toByteArray(StorageService.getPartitioner().getRandomToken())); DataOutputStream out = getOutput("utils.BloomFilter.bin"); @@ -45,10 +45,10 @@ public class SerializationsTest extends AbstractSerializationsTester public void testBloomFilterReadMURMUR2() throws IOException { if (EXECUTE_WRITES) - testBloomFilterWrite(FilterFactory.Type.MURMUR2); + testBloomFilterWrite(FilterFactory.Type.MURMUR2, false); DataInputStream in = getInput("utils.BloomFilter.bin"); - assert FilterFactory.deserialize(in, FilterFactory.Type.MURMUR2) != null; + assert FilterFactory.deserialize(in, FilterFactory.Type.MURMUR2, false) != null; in.close(); } @@ -56,10 +56,10 @@ public class SerializationsTest extends AbstractSerializationsTester public void testBloomFilterReadMURMUR3() throws IOException { if (EXECUTE_WRITES) - testBloomFilterWrite(FilterFactory.Type.MURMUR3); + testBloomFilterWrite(FilterFactory.Type.MURMUR3, true); DataInputStream in = getInput("utils.BloomFilter.bin"); - assert FilterFactory.deserialize(in, FilterFactory.Type.MURMUR3) != null; + assert FilterFactory.deserialize(in, FilterFactory.Type.MURMUR3, true) != null; in.close(); } @@ -87,7 +87,7 @@ public class SerializationsTest extends AbstractSerializationsTester // testLegacyBloomFilterWrite(); DataInputStream in = getInput("utils.LegacyBloomFilter.bin"); - assert FilterFactory.deserialize(in, FilterFactory.Type.SHA) != null; + assert FilterFactory.deserialize(in, FilterFactory.Type.SHA, false) != null; in.close(); }
