Repository: cassandra Updated Branches: refs/heads/trunk dfd695c14 -> 18d8f26dd
Evaluate MurmurHash of Token once per query patch by branimir; reviewed by benedict for CASSANDRA-7096 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/18d8f26d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/18d8f26d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/18d8f26d Branch: refs/heads/trunk Commit: 18d8f26ddd556d6b689a7460053c1518a86d7288 Parents: dfd695c Author: Benedict Elliott Smith <[email protected]> Authored: Thu Feb 19 12:34:27 2015 +0000 Committer: Benedict Elliott Smith <[email protected]> Committed: Thu Feb 19 12:34:27 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../cassandra/db/CachedHashDecoratedKey.java | 52 ++++++++++++++ .../org/apache/cassandra/db/DecoratedKey.java | 10 ++- .../db/compaction/CompactionController.java | 2 +- .../apache/cassandra/dht/LocalPartitioner.java | 4 +- .../cassandra/dht/Murmur3Partitioner.java | 17 ++++- .../dht/OrderPreservingPartitioner.java | 4 +- .../apache/cassandra/dht/RandomPartitioner.java | 4 +- .../io/sstable/format/SSTableReader.java | 2 +- .../io/sstable/format/big/BigTableReader.java | 2 +- .../io/sstable/format/big/BigTableWriter.java | 2 +- .../cassandra/utils/AlwaysPresentFilter.java | 6 +- .../org/apache/cassandra/utils/BloomFilter.java | 36 +++++++--- .../cassandra/utils/BloomFilterSerializer.java | 7 +- .../apache/cassandra/utils/FilterFactory.java | 6 +- .../org/apache/cassandra/utils/IFilter.java | 12 ++-- .../cassandra/utils/Murmur3BloomFilter.java | 67 ------------------ .../org/apache/cassandra/utils/BitSetTest.java | 8 +-- .../apache/cassandra/utils/BloomFilterTest.java | 72 +++++++++++++++++--- .../cassandra/utils/FilterTestHelper.java | 24 ++++++- .../cassandra/utils/SerializationsTest.java | 5 +- 21 files changed, 223 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4906a38..3e2eddd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -56,6 +56,8 @@ * Fail on very large batch sizes (CASSANDRA-8011) * Improve concurrency of repair (CASSANDRA-6455, 8208) * Select optimal CRC32 implementation at runtime (CASSANDRA-8614) + * Evaluate MurmurHash of Token once per query (CASSANDRA-7096) + 2.1.4 * Fix CommitLog.forceRecycleAllSegments() memory access error (CASSANDRA-8812) http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/db/CachedHashDecoratedKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CachedHashDecoratedKey.java b/src/java/org/apache/cassandra/db/CachedHashDecoratedKey.java new file mode 100644 index 0000000..5b81e73 --- /dev/null +++ b/src/java/org/apache/cassandra/db/CachedHashDecoratedKey.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.dht.Token; + +public class CachedHashDecoratedKey extends BufferDecoratedKey +{ + long hash0; + long hash1; + volatile boolean hashCached; + + public CachedHashDecoratedKey(Token token, ByteBuffer key) + { + super(token, key); + hashCached = false; + } + + @Override + public void filterHash(long[] dest) + { + if (hashCached) + { + dest[0] = hash0; + dest[1] = hash1; + } + else + { + super.filterHash(dest); + hash0 = dest[0]; + hash1 = dest[1]; + hashCached = true; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/db/DecoratedKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DecoratedKey.java b/src/java/org/apache/cassandra/db/DecoratedKey.java index 365d261..cc62a15 100644 --- a/src/java/org/apache/cassandra/db/DecoratedKey.java +++ b/src/java/org/apache/cassandra/db/DecoratedKey.java @@ -24,6 +24,8 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.dht.Token.KeyBound; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.MurmurHash; +import org.apache.cassandra.utils.IFilter.FilterKey; /** * Represents a decorated key, handy for certain operations @@ -34,7 +36,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; * if this matters, you can subclass RP to use a stronger hash, or use a non-lossy tokenization scheme (as in the * OrderPreservingPartitioner classes). */ -public abstract class DecoratedKey implements RowPosition +public abstract class DecoratedKey implements RowPosition, FilterKey { public static final Comparator<DecoratedKey> comparator = new Comparator<DecoratedKey>() { @@ -129,4 +131,10 @@ public abstract class DecoratedKey implements RowPosition } public abstract ByteBuffer getKey(); + + public void filterHash(long[] dest) + { + ByteBuffer key = getKey(); + MurmurHash.hash3_x64_128(key, key.position(), key.remaining(), 0, dest); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index 1f9c34f..148b1b6 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -178,7 +178,7 @@ public class CompactionController implements AutoCloseable // we check index file instead. if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null) min = Math.min(min, sstable.getMinTimestamp()); - else if (sstable.getBloomFilter().isPresent(key.getKey())) + else if (sstable.getBloomFilter().isPresent(key)) min = Math.min(min, sstable.getMinTimestamp()); } return min; http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/dht/LocalPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java index dfb0e7d..01dc75e 100644 --- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java +++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java @@ -22,8 +22,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.cassandra.db.BufferDecoratedKey; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.CachedHashDecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.ObjectSizes; @@ -41,7 +41,7 @@ public class LocalPartitioner implements IPartitioner public DecoratedKey decorateKey(ByteBuffer key) { - return new BufferDecoratedKey(getToken(key), key); + return new CachedHashDecoratedKey(getToken(key), key); } public Token midpoint(Token left, Token right) http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java index 11d3abc..0c3c094 100644 --- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java +++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java @@ -26,8 +26,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; -import org.apache.cassandra.db.BufferDecoratedKey; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.PreHashedDecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.exceptions.ConfigurationException; @@ -51,7 +51,8 @@ public class Murmur3Partitioner implements IPartitioner public DecoratedKey decorateKey(ByteBuffer key) { - return new BufferDecoratedKey(getToken(key), key); + long[] hash = getHash(key); + return new PreHashedDecoratedKey(getToken(key, hash), key, hash[0], hash[1]); } public Token midpoint(Token lToken, Token rToken) @@ -149,12 +150,22 @@ public class Murmur3Partitioner implements IPartitioner */ public LongToken getToken(ByteBuffer key) { + return getToken(key, getHash(key)); + } + + private LongToken getToken(ByteBuffer key, long[] hash) + { if (key.remaining() == 0) return MINIMUM; + return new LongToken(normalize(hash[0])); + } + + private long[] getHash(ByteBuffer key) + { long[] hash = new long[2]; MurmurHash.hash3_x64_128(key, key.position(), key.remaining(), 0, hash); - return new LongToken(normalize(hash[0])); + return hash; } public LongToken getRandomToken() http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java index 86ff184..cffa4fc 100644 --- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java +++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java @@ -23,8 +23,8 @@ import java.nio.charset.CharacterCodingException; import java.util.*; import org.apache.cassandra.config.*; -import org.apache.cassandra.db.BufferDecoratedKey; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.CachedHashDecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.exceptions.ConfigurationException; @@ -47,7 +47,7 @@ public class OrderPreservingPartitioner implements IPartitioner public DecoratedKey decorateKey(ByteBuffer key) { - return new BufferDecoratedKey(getToken(key), key); + return new CachedHashDecoratedKey(getToken(key), key); } public StringToken midpoint(Token ltoken, Token rtoken) http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/dht/RandomPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java index eec08b8..71a0a99 100644 --- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java +++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java @@ -24,7 +24,7 @@ import java.util.*; import com.google.common.annotations.VisibleForTesting; -import org.apache.cassandra.db.BufferDecoratedKey; +import org.apache.cassandra.db.CachedHashDecoratedKey; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; @@ -50,7 +50,7 @@ public class RandomPartitioner implements IPartitioner public DecoratedKey decorateKey(ByteBuffer key) { - return new BufferDecoratedKey(getToken(key), key); + return new CachedHashDecoratedKey(getToken(key), key); } public Token midpoint(Token ltoken, Token rtoken) http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index aaf19b2..0b55794 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -699,7 +699,7 @@ public abstract class SSTableReader extends SSTable implements RefCounted<SSTabl last = decoratedKey; if (recreateBloomFilter) - bf.add(decoratedKey.getKey()); + bf.add(decoratedKey); // if summary was already read from disk we don't want to re-populate it using primary index if (!summaryLoaded) http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java index 127a60c..dec9f11 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@ -114,7 +114,7 @@ public class BigTableReader extends SSTableReader if (op == Operator.EQ) { assert key instanceof DecoratedKey; // EQ only make sense if the key is a valid row key - if (!bf.isPresent(((DecoratedKey)key).getKey())) + if (!bf.isPresent((DecoratedKey)key)) { Tracing.trace("Bloom filter allows skipping sstable {}", descriptor.generation); return null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index d411faf..0186c68 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -511,7 +511,7 @@ public class BigTableWriter extends SSTableWriter public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd) { - bf.add(key.getKey()); + bf.add(key); long indexStart = indexFile.getFilePointer(); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java index 1a029e5..a7f6fce 100644 --- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java +++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java @@ -17,16 +17,14 @@ */ package org.apache.cassandra.utils; -import java.nio.ByteBuffer; - public class AlwaysPresentFilter implements IFilter { - public boolean isPresent(ByteBuffer key) + public boolean isPresent(FilterKey key) { return true; } - public void add(ByteBuffer key) { } + public void add(FilterKey key) { } public void clear() { } http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/utils/BloomFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java index 77b2d44..9de202c 100644 --- a/src/java/org/apache/cassandra/utils/BloomFilter.java +++ b/src/java/org/apache/cassandra/utils/BloomFilter.java @@ -17,14 +17,13 @@ */ package org.apache.cassandra.utils; -import java.nio.ByteBuffer; - import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.utils.concurrent.WrappedSharedCloseable; import org.apache.cassandra.utils.obs.IBitSet; +import org.apache.cassandra.db.TypeSizes; -public abstract class BloomFilter extends WrappedSharedCloseable implements IFilter +public class BloomFilter extends WrappedSharedCloseable implements IFilter { private static final ThreadLocal<long[]> reusableIndexes = new ThreadLocal<long[]>() { @@ -51,20 +50,26 @@ public abstract class BloomFilter extends WrappedSharedCloseable implements IFil this.bitset = copy.bitset; } + public static final BloomFilterSerializer serializer = new BloomFilterSerializer(); + + public long serializedSize() + { + return serializer.serializedSize(this, TypeSizes.NATIVE); + } + // Murmur is faster than an SHA-based approach and provides as-good collision // resistance. The combinatorial generation approach described in // http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/esa06.pdf // does prove to work in actual tests, and is obviously faster // than performing further iterations of murmur. - protected abstract void hash(ByteBuffer b, int position, int remaining, long seed, long[] result); // tests ask for ridiculous numbers of hashes so here is a special case for them // rather than using the threadLocal like we do in production @VisibleForTesting - public long[] getHashBuckets(ByteBuffer key, int hashCount, long max) + public long[] getHashBuckets(FilterKey key, int hashCount, long max) { long[] hash = new long[2]; - hash(key, key.position(), key.remaining(), 0L, hash); + key.filterHash(hash); long[] indexes = new long[hashCount]; setIndexes(hash[0], hash[1], hashCount, max, indexes); return indexes; @@ -74,12 +79,12 @@ public abstract class BloomFilter extends WrappedSharedCloseable implements IFil // to avoid generating a lot of garbage since stack allocation currently does not support stores // (CASSANDRA-6609). it returns the array so that the caller does not need to perform // a second threadlocal lookup. - private long[] indexes(ByteBuffer key) + private long[] indexes(FilterKey key) { // we use the same array both for storing the hash result, and for storing the indexes we return, // so that we do not need to allocate two arrays. long[] indexes = reusableIndexes.get(); - hash(key, key.position(), key.remaining(), 0L, indexes); + key.filterHash(indexes); setIndexes(indexes[0], indexes[1], hashCount, bitset.capacity(), indexes); return indexes; } @@ -93,7 +98,7 @@ public abstract class BloomFilter extends WrappedSharedCloseable implements IFil } } - public void add(ByteBuffer key) + public void add(FilterKey key) { long[] indexes = indexes(key); for (int i = 0; i < hashCount; i++) @@ -102,7 +107,7 @@ public abstract class BloomFilter extends WrappedSharedCloseable implements IFil } } - public final boolean isPresent(ByteBuffer key) + public final boolean isPresent(FilterKey key) { long[] indexes = indexes(key); for (int i = 0; i < hashCount; i++) @@ -119,4 +124,15 @@ public abstract class BloomFilter extends WrappedSharedCloseable implements IFil { bitset.clear(); } + + public IFilter sharedCopy() + { + return new BloomFilter(this); + } + + @Override + public long offHeapSize() + { + return bitset.offHeapSize(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java index b95544c..5fad3ea 100644 --- a/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java +++ b/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java @@ -27,7 +27,7 @@ import org.apache.cassandra.utils.obs.IBitSet; import org.apache.cassandra.utils.obs.OffHeapBitSet; import org.apache.cassandra.utils.obs.OpenBitSet; -abstract class BloomFilterSerializer implements ISerializer<BloomFilter> +class BloomFilterSerializer implements ISerializer<BloomFilter> { public void serialize(BloomFilter bf, DataOutputPlus out) throws IOException { @@ -47,7 +47,10 @@ abstract class BloomFilterSerializer implements ISerializer<BloomFilter> return createFilter(hashes, bs); } - protected abstract BloomFilter createFilter(int hashes, IBitSet bs); + BloomFilter createFilter(int hashes, IBitSet bs) + { + return new BloomFilter(hashes, bs); + } /** * Calculates a serialized size of the given Bloom Filter http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/utils/FilterFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FilterFactory.java b/src/java/org/apache/cassandra/utils/FilterFactory.java index 757e8dd..7cfc332 100644 --- a/src/java/org/apache/cassandra/utils/FilterFactory.java +++ b/src/java/org/apache/cassandra/utils/FilterFactory.java @@ -37,12 +37,12 @@ public class FilterFactory public static void serialize(IFilter bf, DataOutputPlus output) throws IOException { - Murmur3BloomFilter.serializer.serialize((Murmur3BloomFilter) bf, output); + BloomFilter.serializer.serialize((BloomFilter) bf, output); } public static IFilter deserialize(DataInput input, boolean offheap) throws IOException { - return Murmur3BloomFilter.serializer.deserialize(input, offheap); + return BloomFilter.serializer.deserialize(input, offheap); } /** @@ -82,6 +82,6 @@ public class FilterFactory { long numBits = (numElements * bucketsPer) + BITSET_EXCESS; IBitSet bitset = offheap ? new OffHeapBitSet(numBits) : new OpenBitSet(numBits); - return new Murmur3BloomFilter(hash, bitset); + return new BloomFilter(hash, bitset); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/utils/IFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/IFilter.java b/src/java/org/apache/cassandra/utils/IFilter.java index bde6333..2f59864 100644 --- a/src/java/org/apache/cassandra/utils/IFilter.java +++ b/src/java/org/apache/cassandra/utils/IFilter.java @@ -17,15 +17,19 @@ */ package org.apache.cassandra.utils; -import java.nio.ByteBuffer; - import org.apache.cassandra.utils.concurrent.SharedCloseable; public interface IFilter extends SharedCloseable { - void add(ByteBuffer key); + public interface FilterKey + { + /** Places the murmur3 hash of the key in the given long array of size at least two. */ + void filterHash(long[] dest); + } + + void add(FilterKey key); - boolean isPresent(ByteBuffer key); + boolean isPresent(FilterKey key); void clear(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java b/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java deleted file mode 100644 index 431ca5b..0000000 --- a/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.utils; - -import java.nio.ByteBuffer; - -import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.utils.obs.IBitSet; - -public class Murmur3BloomFilter extends BloomFilter -{ - public static final Murmur3BloomFilterSerializer serializer = new Murmur3BloomFilterSerializer(); - - public Murmur3BloomFilter(int hashCount, IBitSet bs) - { - super(hashCount, bs); - } - - protected Murmur3BloomFilter(Murmur3BloomFilter copy) - { - super(copy); - } - - public long serializedSize() - { - return serializer.serializedSize(this, TypeSizes.NATIVE); - } - - public IFilter sharedCopy() - { - return new Murmur3BloomFilter(this); - } - - @Override - public long offHeapSize() - { - return bitset.offHeapSize(); - } - - protected void hash(ByteBuffer b, int position, int remaining, long seed, long[] result) - { - MurmurHash.hash3_x64_128(b, b.position(), b.remaining(), seed, result); - } - - public static class Murmur3BloomFilterSerializer extends BloomFilterSerializer - { - protected BloomFilter createFilter(int hashes, IBitSet bs) - { - return new Murmur3BloomFilter(hashes, bs); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/test/unit/org/apache/cassandra/utils/BitSetTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/BitSetTest.java b/test/unit/org/apache/cassandra/utils/BitSetTest.java index 2f1e7c6..9d82edf 100644 --- a/test/unit/org/apache/cassandra/utils/BitSetTest.java +++ b/test/unit/org/apache/cassandra/utils/BitSetTest.java @@ -20,15 +20,15 @@ package org.apache.cassandra.utils; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.List; import java.util.Random; import com.google.common.collect.Lists; -import org.junit.Test; +import org.junit.Test; import org.junit.Assert; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.utils.IFilter.FilterKey; import org.apache.cassandra.utils.KeyGenerator.RandomStringGenerator; import org.apache.cassandra.utils.obs.IBitSet; import org.apache.cassandra.utils.obs.OffHeapBitSet; @@ -54,7 +54,7 @@ public class BitSetTest while (gen1.hasNext()) { - ByteBuffer key = gen1.next(); + FilterKey key = FilterTestHelper.wrap(gen1.next()); bf2.add(key); bf3.add(key); } @@ -100,7 +100,7 @@ public class BitSetTest } } - private void compare(IBitSet bs, IBitSet newbs) + static void compare(IBitSet bs, IBitSet newbs) { assertEquals(bs.capacity(), newbs.capacity()); for (long i = 0; i < bs.capacity(); i++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/test/unit/org/apache/cassandra/utils/BloomFilterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java index ec8c02a..bbf0116 100644 --- a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java +++ b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java @@ -27,13 +27,19 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Iterator; +import java.util.Random; import java.util.Set; import org.junit.*; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputStreamAndChannel; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.IFilter.FilterKey; +import org.apache.cassandra.utils.KeyGenerator.RandomStringGenerator; +import org.apache.cassandra.utils.BloomFilter; public class BloomFilterTest { @@ -46,15 +52,15 @@ public class BloomFilterTest public static IFilter testSerialize(IFilter f) throws IOException { - f.add(ByteBufferUtil.bytes("a")); + f.add(FilterTestHelper.bytes("a")); DataOutputBuffer out = new DataOutputBuffer(); FilterFactory.serialize(f, out); ByteArrayInputStream in = new ByteArrayInputStream(out.getData(), 0, out.getLength()); IFilter f2 = FilterFactory.deserialize(new DataInputStream(in), true); - assert f2.isPresent(ByteBufferUtil.bytes("a")); - assert !f2.isPresent(ByteBufferUtil.bytes("b")); + assert f2.isPresent(FilterTestHelper.bytes("a")); + assert !f2.isPresent(FilterTestHelper.bytes("b")); return f2; } @@ -87,9 +93,9 @@ public class BloomFilterTest @Test public void testOne() { - bf.add(ByteBufferUtil.bytes("a")); - assert bf.isPresent(ByteBufferUtil.bytes("a")); - assert !bf.isPresent(ByteBufferUtil.bytes("b")); + bf.add(FilterTestHelper.bytes("a")); + assert bf.isPresent(FilterTestHelper.bytes("a")); + assert !bf.isPresent(FilterTestHelper.bytes("b")); } @Test @@ -133,7 +139,7 @@ public class BloomFilterTest while (keys.hasNext()) { hashes.clear(); - ByteBuffer buf = keys.next(); + FilterKey buf = FilterTestHelper.wrap(keys.next()); BloomFilter bf = (BloomFilter) FilterFactory.getFilter(10, 1, false); for (long hashIndex : bf.getHashBuckets(buf, MAX_HASH_COUNT, 1024 * 1024)) { @@ -159,6 +165,32 @@ public class BloomFilterTest } @Test + public void compareCachedKey() + { + BloomFilter bf1 = (BloomFilter) FilterFactory.getFilter(FilterTestHelper.ELEMENTS / 2, FilterTestHelper.MAX_FAILURE_RATE, false); + BloomFilter bf2 = (BloomFilter) FilterFactory.getFilter(FilterTestHelper.ELEMENTS / 2, FilterTestHelper.MAX_FAILURE_RATE, false); + BloomFilter bf3 = (BloomFilter) FilterFactory.getFilter(FilterTestHelper.ELEMENTS / 2, FilterTestHelper.MAX_FAILURE_RATE, false); + + RandomStringGenerator gen1 = new KeyGenerator.RandomStringGenerator(new Random().nextInt(), FilterTestHelper.ELEMENTS); + + // make sure all bitsets are empty. + BitSetTest.compare(bf1.bitset, bf2.bitset); + BitSetTest.compare(bf1.bitset, bf3.bitset); + + while (gen1.hasNext()) + { + ByteBuffer key = gen1.next(); + FilterKey cached = FilterTestHelper.wrapCached(key); + bf1.add(FilterTestHelper.wrap(key)); + bf2.add(cached); + bf3.add(cached); + } + + BitSetTest.compare(bf1.bitset, bf2.bitset); + BitSetTest.compare(bf1.bitset, bf3.bitset); + } + + @Test @Ignore public void testHugeBFSerialization() throws IOException { @@ -166,7 +198,7 @@ public class BloomFilterTest File file = FileUtils.createTempFile("bloomFilterTest-", ".dat"); BloomFilter filter = (BloomFilter) FilterFactory.getFilter(((long)Integer.MAX_VALUE / 8) + 1, 0.01d, true); - filter.add(test); + filter.add(FilterTestHelper.wrap(test)); DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(file)); FilterFactory.serialize(filter, out); filter.bitset.serialize(out); @@ -175,7 +207,29 @@ public class BloomFilterTest DataInputStream in = new DataInputStream(new FileInputStream(file)); BloomFilter filter2 = (BloomFilter) FilterFactory.deserialize(in, true); - Assert.assertTrue(filter2.isPresent(test)); + Assert.assertTrue(filter2.isPresent(FilterTestHelper.wrap(test))); FileUtils.closeQuietly(in); } + + @Test + public void testMurmur3FilterHash() + { + IPartitioner partitioner = new Murmur3Partitioner(); + Iterator<ByteBuffer> gen = new KeyGenerator.RandomStringGenerator(new Random().nextInt(), FilterTestHelper.ELEMENTS); + long[] expected = new long[2]; + long[] actual = new long[2]; + while (gen.hasNext()) + { + expected[0] = 1; + expected[1] = 2; + actual[0] = 3; + actual[1] = 4; + ByteBuffer key = gen.next(); + FilterKey expectedKey = FilterTestHelper.wrap(key); + FilterKey actualKey = partitioner.decorateKey(key); + actualKey.filterHash(actual); + expectedKey.filterHash(expected); + Assert.assertArrayEquals(expected, actual); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/test/unit/org/apache/cassandra/utils/FilterTestHelper.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/FilterTestHelper.java b/test/unit/org/apache/cassandra/utils/FilterTestHelper.java index cab7195..6d921cb 100644 --- a/test/unit/org/apache/cassandra/utils/FilterTestHelper.java +++ b/test/unit/org/apache/cassandra/utils/FilterTestHelper.java @@ -20,6 +20,11 @@ package org.apache.cassandra.utils; import java.nio.ByteBuffer; +import org.apache.cassandra.db.BufferDecoratedKey; +import org.apache.cassandra.db.CachedHashDecoratedKey; +import org.apache.cassandra.dht.Murmur3Partitioner.LongToken; +import org.apache.cassandra.utils.IFilter.FilterKey; + public class FilterTestHelper { // used by filter subclass tests @@ -28,6 +33,21 @@ public class FilterTestHelper public static final BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(15, MAX_FAILURE_RATE); static final int ELEMENTS = 10000; + static final FilterKey bytes(String s) + { + return new BufferDecoratedKey(new LongToken(0L), ByteBufferUtil.bytes(s)); + } + + static final FilterKey wrap(ByteBuffer buf) + { + return new BufferDecoratedKey(new LongToken(0L), buf); + } + + static final FilterKey wrapCached(ByteBuffer buf) + { + return new CachedHashDecoratedKey(new LongToken(0L), buf); + } + static final ResetableIterator<ByteBuffer> intKeys() { return new KeyGenerator.IntGenerator(ELEMENTS); @@ -49,13 +69,13 @@ public class FilterTestHelper while (keys.hasNext()) { - f.add(keys.next()); + f.add(wrap(keys.next())); } int fp = 0; while (otherkeys.hasNext()) { - if (f.isPresent(otherkeys.next())) + if (f.isPresent(wrap(otherkeys.next()))) { fp++; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/18d8f26d/test/unit/org/apache/cassandra/utils/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/SerializationsTest.java b/test/unit/org/apache/cassandra/utils/SerializationsTest.java index d5b45d1..b3c545b 100644 --- a/test/unit/org/apache/cassandra/utils/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/utils/SerializationsTest.java @@ -19,9 +19,9 @@ package org.apache.cassandra.utils; import org.apache.cassandra.AbstractSerializationsTester; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.DataOutputStreamAndChannel; import org.apache.cassandra.service.StorageService; - import org.junit.Assert; import org.junit.Test; @@ -33,10 +33,11 @@ public class SerializationsTest extends AbstractSerializationsTester private void testBloomFilterWrite(boolean offheap) throws IOException { + IPartitioner partitioner = StorageService.getPartitioner(); try (IFilter bf = FilterFactory.getFilter(1000000, 0.0001, offheap)) { for (int i = 0; i < 100; i++) - bf.add(StorageService.getPartitioner().getTokenFactory().toByteArray(StorageService.getPartitioner().getRandomToken())); + bf.add(partitioner.decorateKey(partitioner.getTokenFactory().toByteArray(partitioner.getRandomToken()))); try (DataOutputStreamAndChannel out = getOutput("utils.BloomFilter.bin")) { FilterFactory.serialize(bf, out);
