http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bit-util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/bit-util-test.cc b/be/src/kudu/util/bit-util-test.cc new file mode 100644 index 0000000..0d8eab4 --- /dev/null +++ b/be/src/kudu/util/bit-util-test.cc @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <boost/utility/binary.hpp> +#include <gtest/gtest.h> +#include "kudu/util/bit-util.h" + +namespace kudu { + +TEST(BitUtil, TrailingBits) { + EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 0), 0); + EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 1), 1); + EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 64), + BOOST_BINARY(1 1 1 1 1 1 1 1)); + EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 100), + BOOST_BINARY(1 1 1 1 1 1 1 1)); + EXPECT_EQ(BitUtil::TrailingBits(0, 1), 0); + EXPECT_EQ(BitUtil::TrailingBits(0, 64), 0); + EXPECT_EQ(BitUtil::TrailingBits(1LL << 63, 0), 0); + EXPECT_EQ(BitUtil::TrailingBits(1LL << 63, 63), 0); + EXPECT_EQ(BitUtil::TrailingBits(1LL << 63, 64), 1LL << 63); + +} + +TEST(BitUtil, ShiftBits) { + EXPECT_EQ(BitUtil::ShiftLeftZeroOnOverflow(1ULL, 64), 0ULL); + EXPECT_EQ(BitUtil::ShiftLeftZeroOnOverflow(0xFFFFFFFFFFFFFFFFULL, 32), 0xFFFFFFFF00000000ULL); + EXPECT_EQ(BitUtil::ShiftRightZeroOnOverflow(1ULL, 64), 0ULL); + EXPECT_EQ(BitUtil::ShiftRightZeroOnOverflow(0xFFFFFFFFFFFFFFFFULL, 32), 0x00000000FFFFFFFFULL); +} + +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bit-util.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/bit-util.h b/be/src/kudu/util/bit-util.h new file mode 100644 index 0000000..5f36887 --- /dev/null +++ b/be/src/kudu/util/bit-util.h @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef IMPALA_BIT_UTIL_H +#define IMPALA_BIT_UTIL_H + +#include <stdint.h> +#include "kudu/gutil/port.h" + +namespace kudu { + +// Utility class to do standard bit tricks +// TODO: is this in boost or something else like that? +class BitUtil { + public: + // Returns the ceil of value/divisor + static inline int Ceil(int value, int divisor) { + return value / divisor + (value % divisor != 0); + } + + // Returns the 'num_bits' least-significant bits of 'v'. + static inline uint64_t TrailingBits(uint64_t v, int num_bits) { + if (PREDICT_FALSE(num_bits == 0)) return 0; + if (PREDICT_FALSE(num_bits >= 64)) return v; + int n = 64 - num_bits; + return (v << n) >> n; + } + + static inline uint64_t ShiftLeftZeroOnOverflow(uint64_t v, int num_bits) { + if (PREDICT_FALSE(num_bits >= 64)) return 0; + return v << num_bits; + } + + static inline uint64_t ShiftRightZeroOnOverflow(uint64_t v, int num_bits) { + if (PREDICT_FALSE(num_bits >= 64)) return 0; + return v >> num_bits; + } + + +}; + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bitmap-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/bitmap-test.cc b/be/src/kudu/util/bitmap-test.cc new file mode 100644 index 0000000..a91e414 --- /dev/null +++ b/be/src/kudu/util/bitmap-test.cc @@ -0,0 +1,223 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> +#include <vector> + +#include "kudu/gutil/strings/join.h" +#include "kudu/util/bitmap.h" + +namespace kudu { + +static int ReadBackBitmap(uint8_t *bm, size_t bits, + std::vector<size_t> *result) { + int iters = 0; + for (TrueBitIterator iter(bm, bits); + !iter.done(); + ++iter) { + size_t val = *iter; + result->push_back(val); + + iters++; + } + return iters; +} + +TEST(TestBitMap, TestIteration) { + uint8_t bm[8]; + memset(bm, 0, sizeof(bm)); + BitmapSet(bm, 0); + BitmapSet(bm, 8); + BitmapSet(bm, 31); + BitmapSet(bm, 32); + BitmapSet(bm, 33); + BitmapSet(bm, 63); + + EXPECT_EQ(" 0: 10000000 10000000 00000000 00000001 11000000 00000000 00000000 00000001 \n", + BitmapToString(bm, sizeof(bm) * 8)); + + std::vector<size_t> read_back; + + int iters = ReadBackBitmap(bm, sizeof(bm)*8, &read_back); + ASSERT_EQ(6, iters); + ASSERT_EQ("0,8,31,32,33,63", JoinElements(read_back, ",")); +} + + +TEST(TestBitMap, TestIteration2) { + uint8_t bm[1]; + memset(bm, 0, sizeof(bm)); + BitmapSet(bm, 1); + + std::vector<size_t> read_back; + + int iters = ReadBackBitmap(bm, 3, &read_back); + ASSERT_EQ(1, iters); + ASSERT_EQ("1", JoinElements(read_back, ",")); +} + +TEST(TestBitmap, TestSetAndTestBits) { + uint8_t bm[1]; + memset(bm, 0, sizeof(bm)); + + size_t num_bits = sizeof(bm) * 8; + for (size_t i = 0; i < num_bits; i++) { + ASSERT_FALSE(BitmapTest(bm, i)); + + BitmapSet(bm, i); + ASSERT_TRUE(BitmapTest(bm, i)); + + BitmapClear(bm, i); + ASSERT_FALSE(BitmapTest(bm, i)); + + BitmapChange(bm, i, true); + ASSERT_TRUE(BitmapTest(bm, i)); + + BitmapChange(bm, i, false); + ASSERT_FALSE(BitmapTest(bm, i)); + } + + // Set the other bit: 01010101 + for (size_t i = 0; i < num_bits; ++i) { + ASSERT_FALSE(BitmapTest(bm, i)); + if (i & 1) BitmapSet(bm, i); + } + + // Check and Clear the other bit: 0000000 + for (size_t i = 0; i < num_bits; ++i) { + ASSERT_EQ(!!(i & 1), BitmapTest(bm, i)); + if (i & 1) BitmapClear(bm, i); + } + + // Check if bits are zero and change the other to one + for (size_t i = 0; i < num_bits; ++i) { + ASSERT_FALSE(BitmapTest(bm, i)); + BitmapChange(bm, i, i & 1); + } + + // Check the bits change them again + for (size_t i = 0; i < num_bits; ++i) { + ASSERT_EQ(!!(i & 1), BitmapTest(bm, i)); + BitmapChange(bm, i, !(i & 1)); + } + + // Check the last setup + for (size_t i = 0; i < num_bits; ++i) { + ASSERT_EQ(!(i & 1), BitmapTest(bm, i)); + } +} + +TEST(TestBitMap, TestBulkSetAndTestBits) { + uint8_t bm[16]; + size_t total_size = sizeof(bm) * 8; + + // Test Bulk change bits and test bits + for (int i = 0; i < 4; ++i) { + bool value = i & 1; + size_t num_bits = total_size; + while (num_bits > 0) { + for (size_t offset = 0; offset < num_bits; ++offset) { + BitmapChangeBits(bm, 0, total_size, !value); + BitmapChangeBits(bm, offset, num_bits - offset, value); + + ASSERT_EQ(value, BitMapIsAllSet(bm, offset, num_bits)); + ASSERT_EQ(!value, BitmapIsAllZero(bm, offset, num_bits)); + + if (offset > 1) { + ASSERT_EQ(value, BitmapIsAllZero(bm, 0, offset - 1)); + ASSERT_EQ(!value, BitMapIsAllSet(bm, 0, offset - 1)); + } + + if ((offset + num_bits) < total_size) { + ASSERT_EQ(value, BitmapIsAllZero(bm, num_bits, total_size)); + ASSERT_EQ(!value, BitMapIsAllSet(bm, num_bits, total_size)); + } + } + num_bits--; + } + } +} + +TEST(TestBitMap, TestFindBit) { + uint8_t bm[16]; + + size_t num_bits = sizeof(bm) * 8; + BitmapChangeBits(bm, 0, num_bits, false); + while (num_bits > 0) { + for (size_t offset = 0; offset < num_bits; ++offset) { + size_t idx; + ASSERT_FALSE(BitmapFindFirstSet(bm, offset, num_bits, &idx)); + ASSERT_TRUE(BitmapFindFirstZero(bm, offset, num_bits, &idx)); + ASSERT_EQ(idx, offset); + } + num_bits--; + } + + num_bits = sizeof(bm) * 8; + for (int i = 0; i < num_bits; ++i) { + BitmapChange(bm, i, i & 3); + } + + while (num_bits--) { + for (size_t offset = 0; offset < num_bits; ++offset) { + size_t idx; + + // Find a set bit + bool res = BitmapFindFirstSet(bm, offset, num_bits, &idx); + size_t expected_set_idx = (offset + !(offset & 3)); + bool expect_set_found = (expected_set_idx < num_bits); + ASSERT_EQ(expect_set_found, res); + if (expect_set_found) ASSERT_EQ(expected_set_idx, idx); + + // Find a zero bit + res = BitmapFindFirstZero(bm, offset, num_bits, &idx); + size_t expected_zero_idx = offset + ((offset & 3) ? (4 - (offset & 3)) : 0); + bool expect_zero_found = (expected_zero_idx < num_bits); + ASSERT_EQ(expect_zero_found, res); + if (expect_zero_found) ASSERT_EQ(expected_zero_idx, idx); + } + } +} + +TEST(TestBitMap, TestBitmapIteration) { + uint8_t bm[8]; + memset(bm, 0, sizeof(bm)); + BitmapSet(bm, 0); + BitmapSet(bm, 8); + BitmapSet(bm, 31); + BitmapSet(bm, 32); + BitmapSet(bm, 33); + BitmapSet(bm, 63); + + BitmapIterator biter(bm, sizeof(bm) * 8); + + size_t i = 0; + size_t size; + bool value = false; + bool expected_value = true; + size_t expected_sizes[] = {1, 7, 1, 22, 3, 29, 1, 0}; + while ((size = biter.Next(&value)) > 0) { + ASSERT_LT(i, 8); + ASSERT_EQ(expected_value, value); + ASSERT_EQ(expected_sizes[i], size); + expected_value = !expected_value; + i++; + } + ASSERT_EQ(expected_sizes[i], size); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bitmap.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/bitmap.cc b/be/src/kudu/util/bitmap.cc new file mode 100644 index 0000000..e38f2e2 --- /dev/null +++ b/be/src/kudu/util/bitmap.cc @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <glog/logging.h> +#include <string> + +#include "kudu/gutil/stringprintf.h" +#include "kudu/util/bitmap.h" + +namespace kudu { + +void BitmapChangeBits(uint8_t *bitmap, size_t offset, size_t num_bits, bool value) { + DCHECK_GT(num_bits, 0); + + size_t start_byte = (offset >> 3); + size_t end_byte = (offset + num_bits - 1) >> 3; + int single_byte = (start_byte == end_byte); + + // Change the last bits of the first byte + size_t left = offset & 0x7; + size_t right = (single_byte) ? (left + num_bits) : 8; + uint8_t mask = ((0xff << left) & (0xff >> (8 - right))); + if (value) { + bitmap[start_byte++] |= mask; + } else { + bitmap[start_byte++] &= ~mask; + } + + // Nothing left... I'm done + if (single_byte) { + return; + } + + // change the middle bits + if (end_byte > start_byte) { + const uint8_t pattern8[2] = { 0x00, 0xff }; + memset(bitmap + start_byte, pattern8[value], end_byte - start_byte); + } + + // change the first bits of the last byte + right = offset + num_bits - (end_byte << 3); + mask = (0xff >> (8 - right)); + if (value) { + bitmap[end_byte] |= mask; + } else { + bitmap[end_byte] &= ~mask; + } +} + +bool BitmapFindFirst(const uint8_t *bitmap, size_t offset, size_t bitmap_size, + bool value, size_t *idx) { + const uint64_t pattern64[2] = { 0xffffffffffffffff, 0x0000000000000000 }; + const uint8_t pattern8[2] = { 0xff, 0x00 }; + size_t bit; + + DCHECK_LE(offset, bitmap_size); + + // Jump to the byte at specified offset + const uint8_t *p = bitmap + (offset >> 3); + size_t num_bits = bitmap_size - offset; + + // Find a 'value' bit at the end of the first byte + if ((bit = offset & 0x7)) { + for (; bit < 8 && num_bits > 0; ++bit) { + if (BitmapTest(p, bit) == value) { + *idx = ((p - bitmap) << 3) + bit; + return true; + } + + num_bits--; + } + + p++; + } + + // check 64bit at the time for a 'value' bit + const uint64_t *u64 = (const uint64_t *)p; + while (num_bits >= 64 && *u64 == pattern64[value]) { + num_bits -= 64; + u64++; + } + + // check 8bit at the time for a 'value' bit + p = (const uint8_t *)u64; + while (num_bits >= 8 && *p == pattern8[value]) { + num_bits -= 8; + p++; + } + + // Find a 'value' bit at the beginning of the last byte + for (bit = 0; num_bits > 0; ++bit) { + if (BitmapTest(p, bit) == value) { + *idx = ((p - bitmap) << 3) + bit; + return true; + } + num_bits--; + } + + return false; +} + +std::string BitmapToString(const uint8_t *bitmap, size_t num_bits) { + std::string s; + size_t index = 0; + while (index < num_bits) { + StringAppendF(&s, "%4zu: ", index); + for (int i = 0; i < 8 && index < num_bits; ++i) { + for (int j = 0; j < 8 && index < num_bits; ++j) { + StringAppendF(&s, "%d", BitmapTest(bitmap, index)); + index++; + } + StringAppendF(&s, " "); + } + StringAppendF(&s, "\n"); + } + return s; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bitmap.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/bitmap.h b/be/src/kudu/util/bitmap.h new file mode 100644 index 0000000..1689b50 --- /dev/null +++ b/be/src/kudu/util/bitmap.h @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Utility functions for dealing with a byte array as if it were a bitmap. +#ifndef KUDU_UTIL_BITMAP_H +#define KUDU_UTIL_BITMAP_H + +#include <string> +#include "kudu/gutil/bits.h" + +namespace kudu { + +// Return the number of bytes necessary to store the given number of bits. +inline size_t BitmapSize(size_t num_bits) { + return (num_bits + 7) / 8; +} + +// Set the given bit. +inline void BitmapSet(uint8_t *bitmap, size_t idx) { + bitmap[idx >> 3] |= 1 << (idx & 7); +} + +// Switch the given bit to the specified value. +inline void BitmapChange(uint8_t *bitmap, size_t idx, bool value) { + bitmap[idx >> 3] = (bitmap[idx >> 3] & ~(1 << (idx & 7))) | ((!!value) << (idx & 7)); +} + +// Clear the given bit. +inline void BitmapClear(uint8_t *bitmap, size_t idx) { + bitmap[idx >> 3] &= ~(1 << (idx & 7)); +} + +// Test/get the given bit. +inline bool BitmapTest(const uint8_t *bitmap, size_t idx) { + return bitmap[idx >> 3] & (1 << (idx & 7)); +} + +// Merge the two bitmaps using bitwise or. Both bitmaps should have at least +// n_bits valid bits. +inline void BitmapMergeOr(uint8_t *dst, const uint8_t *src, size_t n_bits) { + size_t n_bytes = BitmapSize(n_bits); + for (size_t i = 0; i < n_bytes; i++) { + *dst++ |= *src++; + } +} + +// Set bits from offset to (offset + num_bits) to the specified value +void BitmapChangeBits(uint8_t *bitmap, size_t offset, size_t num_bits, bool value); + +// Find the first bit of the specified value, starting from the specified offset. +bool BitmapFindFirst(const uint8_t *bitmap, size_t offset, size_t bitmap_size, + bool value, size_t *idx); + +// Find the first set bit in the bitmap, at the specified offset. +inline bool BitmapFindFirstSet(const uint8_t *bitmap, size_t offset, + size_t bitmap_size, size_t *idx) { + return BitmapFindFirst(bitmap, offset, bitmap_size, true, idx); +} + +// Find the first zero bit in the bitmap, at the specified offset. +inline bool BitmapFindFirstZero(const uint8_t *bitmap, size_t offset, + size_t bitmap_size, size_t *idx) { + return BitmapFindFirst(bitmap, offset, bitmap_size, false, idx); +} + +// Returns true if the bitmap contains only ones. +inline bool BitMapIsAllSet(const uint8_t *bitmap, size_t offset, size_t bitmap_size) { + DCHECK_LT(offset, bitmap_size); + size_t idx; + return !BitmapFindFirstZero(bitmap, offset, bitmap_size, &idx); +} + +// Returns true if the bitmap contains only zeros. +inline bool BitmapIsAllZero(const uint8_t *bitmap, size_t offset, size_t bitmap_size) { + DCHECK_LT(offset, bitmap_size); + size_t idx; + return !BitmapFindFirstSet(bitmap, offset, bitmap_size, &idx); +} + +std::string BitmapToString(const uint8_t *bitmap, size_t num_bits); + +// Iterator which yields ranges of set and unset bits. +// Example usage: +// bool value; +// size_t size; +// BitmapIterator iter(bitmap, n_bits); +// while ((size = iter.Next(&value))) { +// printf("bitmap block len=%lu value=%d\n", size, value); +// } +class BitmapIterator { + public: + BitmapIterator(const uint8_t *map, size_t num_bits) + : offset_(0), num_bits_(num_bits), map_(map) + {} + + bool done() const { + return (num_bits_ - offset_) == 0; + } + + void SeekTo(size_t bit) { + DCHECK_LE(bit, num_bits_); + offset_ = bit; + } + + size_t Next(bool *value) { + size_t len = num_bits_ - offset_; + if (PREDICT_FALSE(len == 0)) + return(0); + + *value = BitmapTest(map_, offset_); + + size_t index; + if (BitmapFindFirst(map_, offset_, num_bits_, !(*value), &index)) { + len = index - offset_; + } else { + index = num_bits_; + } + + offset_ = index; + return len; + } + + private: + size_t offset_; + size_t num_bits_; + const uint8_t *map_; +}; + +// Iterator which yields the set bits in a bitmap. +// Example usage: +// for (TrueBitIterator iter(bitmap, n_bits); +// !iter.done(); +// ++iter) { +// int next_onebit_position = *iter; +// } +class TrueBitIterator { + public: + TrueBitIterator(const uint8_t *bitmap, size_t n_bits) + : bitmap_(bitmap), + cur_byte_(0), + cur_byte_idx_(0), + n_bits_(n_bits), + n_bytes_(BitmapSize(n_bits_)), + bit_idx_(0) { + if (n_bits_ == 0) { + cur_byte_idx_ = 1; // sets done + } else { + cur_byte_ = bitmap[0]; + AdvanceToNextOneBit(); + } + } + + TrueBitIterator &operator ++() { + DCHECK(!done()); + DCHECK(cur_byte_ & 1); + cur_byte_ &= (~1); + AdvanceToNextOneBit(); + return *this; + } + + bool done() const { + return cur_byte_idx_ >= n_bytes_; + } + + size_t operator *() const { + DCHECK(!done()); + return bit_idx_; + } + + private: + void AdvanceToNextOneBit() { + while (cur_byte_ == 0) { + cur_byte_idx_++; + if (cur_byte_idx_ >= n_bytes_) return; + cur_byte_ = bitmap_[cur_byte_idx_]; + bit_idx_ = cur_byte_idx_ * 8; + } + DVLOG(2) << "Found next nonzero byte at " << cur_byte_idx_ + << " val=" << cur_byte_; + + DCHECK_NE(cur_byte_, 0); + int set_bit = Bits::FindLSBSetNonZero(cur_byte_); + bit_idx_ += set_bit; + cur_byte_ >>= set_bit; + } + + const uint8_t *bitmap_; + uint8_t cur_byte_; + uint8_t cur_byte_idx_; + + const size_t n_bits_; + const size_t n_bytes_; + size_t bit_idx_; +}; + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/blocking_queue-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/blocking_queue-test.cc b/be/src/kudu/util/blocking_queue-test.cc new file mode 100644 index 0000000..00f558a --- /dev/null +++ b/be/src/kudu/util/blocking_queue-test.cc @@ -0,0 +1,245 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <glog/logging.h> +#include <gtest/gtest.h> +#include <map> +#include <memory> +#include <string> +#include <thread> +#include <vector> + +#include "kudu/util/countdown_latch.h" +#include "kudu/util/blocking_queue.h" +#include "kudu/util/status.h" +#include "kudu/util/test_macros.h" + +using std::string; +using std::thread; +using std::vector; + +namespace kudu { + +BlockingQueue<int32_t> test1_queue(5); + +void InsertSomeThings() { + ASSERT_EQ(test1_queue.Put(1), QUEUE_SUCCESS); + ASSERT_EQ(test1_queue.Put(2), QUEUE_SUCCESS); + ASSERT_EQ(test1_queue.Put(3), QUEUE_SUCCESS); +} + +TEST(BlockingQueueTest, Test1) { + thread inserter_thread(InsertSomeThings); + int32_t i; + ASSERT_TRUE(test1_queue.BlockingGet(&i)); + ASSERT_EQ(1, i); + ASSERT_TRUE(test1_queue.BlockingGet(&i)); + ASSERT_EQ(2, i); + ASSERT_TRUE(test1_queue.BlockingGet(&i)); + ASSERT_EQ(3, i); + inserter_thread.join(); +} + +TEST(BlockingQueueTest, TestBlockingDrainTo) { + BlockingQueue<int32_t> test_queue(3); + ASSERT_EQ(test_queue.Put(1), QUEUE_SUCCESS); + ASSERT_EQ(test_queue.Put(2), QUEUE_SUCCESS); + ASSERT_EQ(test_queue.Put(3), QUEUE_SUCCESS); + vector<int32_t> out; + ASSERT_OK(test_queue.BlockingDrainTo(&out, MonoTime::Now() + MonoDelta::FromSeconds(30))); + ASSERT_EQ(1, out[0]); + ASSERT_EQ(2, out[1]); + ASSERT_EQ(3, out[2]); + + // Set a deadline in the past and ensure we time out. + Status s = test_queue.BlockingDrainTo(&out, MonoTime::Now() - MonoDelta::FromSeconds(1)); + ASSERT_TRUE(s.IsTimedOut()); + + // Ensure that if the queue is shut down, we get Aborted status. + test_queue.Shutdown(); + s = test_queue.BlockingDrainTo(&out, MonoTime::Now() - MonoDelta::FromSeconds(1)); + ASSERT_TRUE(s.IsAborted()); +} + +// Test that, when the queue is shut down with elements still pending, +// Drain still returns OK until the elements are all gone. +TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) { + // Put some elements into the queue and then shut it down. + BlockingQueue<int32_t> q(3); + ASSERT_EQ(q.Put(1), QUEUE_SUCCESS); + ASSERT_EQ(q.Put(2), QUEUE_SUCCESS); + + q.Shutdown(); + + // Get() should still return an element. + int i; + ASSERT_TRUE(q.BlockingGet(&i)); + ASSERT_EQ(1, i); + + // Drain should still return OK, since it yielded elements. + vector<int32_t> out; + ASSERT_OK(q.BlockingDrainTo(&out)); + ASSERT_EQ(2, out[0]); + + // Now that it's empty, it should return Aborted. + Status s = q.BlockingDrainTo(&out); + ASSERT_TRUE(s.IsAborted()) << s.ToString(); + ASSERT_FALSE(q.BlockingGet(&i)); +} + +TEST(BlockingQueueTest, TestTooManyInsertions) { + BlockingQueue<int32_t> test_queue(2); + ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS); + ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS); + ASSERT_EQ(test_queue.Put(123), QUEUE_FULL); +} + +namespace { + +struct LengthLogicalSize { + static size_t logical_size(const string& s) { + return s.length(); + } +}; + +} // anonymous namespace + +TEST(BlockingQueueTest, TestLogicalSize) { + BlockingQueue<string, LengthLogicalSize> test_queue(4); + ASSERT_EQ(test_queue.Put("a"), QUEUE_SUCCESS); + ASSERT_EQ(test_queue.Put("bcd"), QUEUE_SUCCESS); + ASSERT_EQ(test_queue.Put("e"), QUEUE_FULL); +} + +TEST(BlockingQueueTest, TestNonPointerParamsMayBeNonEmptyOnDestruct) { + BlockingQueue<int32_t> test_queue(1); + ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS); + // No DCHECK failure on destruct. +} + +#ifndef NDEBUG +TEST(BlockingQueueDeathTest, TestPointerParamsMustBeEmptyOnDestruct) { + ::testing::FLAGS_gtest_death_test_style = "threadsafe"; + ASSERT_DEATH({ + BlockingQueue<int32_t*> test_queue(1); + int32_t element = 123; + ASSERT_EQ(test_queue.Put(&element), QUEUE_SUCCESS); + // Debug assertion triggered on queue destruction since type is a pointer. + }, + "BlockingQueue holds bare pointers"); +} +#endif // NDEBUG + +TEST(BlockingQueueTest, TestGetFromShutdownQueue) { + BlockingQueue<int64_t> test_queue(2); + ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS); + test_queue.Shutdown(); + ASSERT_EQ(test_queue.Put(456), QUEUE_SHUTDOWN); + int64_t i; + ASSERT_TRUE(test_queue.BlockingGet(&i)); + ASSERT_EQ(123, i); + ASSERT_FALSE(test_queue.BlockingGet(&i)); +} + +TEST(BlockingQueueTest, TestGscopedPtrMethods) { + BlockingQueue<int*> test_queue(2); + gscoped_ptr<int> input_int(new int(123)); + ASSERT_EQ(test_queue.Put(&input_int), QUEUE_SUCCESS); + gscoped_ptr<int> output_int; + ASSERT_TRUE(test_queue.BlockingGet(&output_int)); + ASSERT_EQ(123, *output_int.get()); + test_queue.Shutdown(); +} + +class MultiThreadTest { + public: + MultiThreadTest() + : puts_(4), + blocking_puts_(4), + nthreads_(5), + queue_(nthreads_ * puts_), + num_inserters_(nthreads_), + sync_latch_(nthreads_) { + } + + void InserterThread(int arg) { + for (int i = 0; i < puts_; i++) { + ASSERT_EQ(queue_.Put(arg), QUEUE_SUCCESS); + } + sync_latch_.CountDown(); + sync_latch_.Wait(); + for (int i = 0; i < blocking_puts_; i++) { + ASSERT_TRUE(queue_.BlockingPut(arg)); + } + MutexLock guard(lock_); + if (--num_inserters_ == 0) { + queue_.Shutdown(); + } + } + + void RemoverThread() { + for (int i = 0; i < puts_ + blocking_puts_; i++) { + int32_t arg = 0; + bool got = queue_.BlockingGet(&arg); + if (!got) { + arg = -1; + } + MutexLock guard(lock_); + gotten_[arg] = gotten_[arg] + 1; + } + } + + void Run() { + for (int i = 0; i < nthreads_; i++) { + threads_.emplace_back(&MultiThreadTest::InserterThread, this, i); + threads_.emplace_back(&MultiThreadTest::RemoverThread, this); + } + // We add an extra thread to ensure that there aren't enough elements in + // the queue to go around. This way, we test removal after Shutdown. + threads_.emplace_back(&MultiThreadTest::RemoverThread, this); + for (auto& thread : threads_) { + thread.join(); + } + // Let's check to make sure we got what we should have. + MutexLock guard(lock_); + for (int i = 0; i < nthreads_; i++) { + ASSERT_EQ(puts_ + blocking_puts_, gotten_[i]); + } + // And there were nthreads_ * (puts_ + blocking_puts_) + // elements removed, but only nthreads_ * puts_ + + // blocking_puts_ elements added. So some removers hit the + // shutdown case. + ASSERT_EQ(puts_ + blocking_puts_, gotten_[-1]); + } + + int puts_; + int blocking_puts_; + int nthreads_; + BlockingQueue<int32_t> queue_; + Mutex lock_; + std::map<int32_t, int> gotten_; + vector<thread> threads_; + int num_inserters_; + CountDownLatch sync_latch_; +}; + +TEST(BlockingQueueTest, TestMultipleThreads) { + MultiThreadTest test; + test.Run(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/blocking_queue.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/blocking_queue.h b/be/src/kudu/util/blocking_queue.h new file mode 100644 index 0000000..2907030 --- /dev/null +++ b/be/src/kudu/util/blocking_queue.h @@ -0,0 +1,255 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_BLOCKING_QUEUE_H +#define KUDU_UTIL_BLOCKING_QUEUE_H + +#include <list> +#include <string> +#include <type_traits> +#include <unistd.h> +#include <vector> + +#include "kudu/gutil/basictypes.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/util/condition_variable.h" +#include "kudu/util/mutex.h" +#include "kudu/util/status.h" + +namespace kudu { + +// Return values for BlockingQueue::Put() +enum QueueStatus { + QUEUE_SUCCESS = 0, + QUEUE_SHUTDOWN = 1, + QUEUE_FULL = 2 +}; + +// Default logical length implementation: always returns 1. +struct DefaultLogicalSize { + template<typename T> + static size_t logical_size(const T& /* unused */) { + return 1; + } +}; + +template <typename T, class LOGICAL_SIZE = DefaultLogicalSize> +class BlockingQueue { + public: + // If T is a pointer, this will be the base type. If T is not a pointer, you + // can ignore this and the functions which make use of it. + // Template substitution failure is not an error. + typedef typename std::remove_pointer<T>::type T_VAL; + + explicit BlockingQueue(size_t max_size) + : shutdown_(false), + size_(0), + max_size_(max_size), + not_empty_(&lock_), + not_full_(&lock_) { + } + + // If the queue holds a bare pointer, it must be empty on destruction, since + // it may have ownership of the pointer. + ~BlockingQueue() { + DCHECK(list_.empty() || !std::is_pointer<T>::value) + << "BlockingQueue holds bare pointers at destruction time"; + } + + // Get an element from the queue. Returns false if we were shut down prior to + // getting the element. + bool BlockingGet(T *out) { + MutexLock l(lock_); + while (true) { + if (!list_.empty()) { + *out = list_.front(); + list_.pop_front(); + decrement_size_unlocked(*out); + not_full_.Signal(); + return true; + } + if (shutdown_) { + return false; + } + not_empty_.Wait(); + } + } + + // Get an element from the queue. Returns false if the queue is empty and + // we were shut down prior to getting the element. + bool BlockingGet(gscoped_ptr<T_VAL> *out) { + T t = NULL; + bool got_element = BlockingGet(&t); + if (!got_element) { + return false; + } + out->reset(t); + return true; + } + + // Get all elements from the queue and append them to a vector. + // + // If 'deadline' passes and no elements have been returned from the + // queue, returns Status::TimedOut(). If 'deadline' is uninitialized, + // no deadline is used. + // + // If the queue has been shut down, but there are still elements waiting, + // then it returns those elements as if the queue were not yet shut down. + // + // Returns: + // - OK if successful + // - TimedOut if the deadline passed + // - Aborted if the queue shut down + Status BlockingDrainTo(std::vector<T>* out, MonoTime deadline = MonoTime()) { + MutexLock l(lock_); + while (true) { + if (!list_.empty()) { + out->reserve(list_.size()); + for (const T& elt : list_) { + out->push_back(elt); + decrement_size_unlocked(elt); + } + list_.clear(); + not_full_.Signal(); + return Status::OK(); + } + if (PREDICT_FALSE(shutdown_)) { + return Status::Aborted(""); + } + if (!deadline.Initialized()) { + not_empty_.Wait(); + } else if (PREDICT_FALSE(!not_empty_.TimedWait(deadline - MonoTime::Now()))) { + return Status::TimedOut(""); + } + } + } + + // Attempts to put the given value in the queue. + // Returns: + // QUEUE_SUCCESS: if successfully inserted + // QUEUE_FULL: if the queue has reached max_size + // QUEUE_SHUTDOWN: if someone has already called Shutdown() + QueueStatus Put(const T &val) { + MutexLock l(lock_); + if (size_ >= max_size_) { + return QUEUE_FULL; + } + if (shutdown_) { + return QUEUE_SHUTDOWN; + } + list_.push_back(val); + increment_size_unlocked(val); + l.Unlock(); + not_empty_.Signal(); + return QUEUE_SUCCESS; + } + + // Returns the same as the other Put() overload above. + // If the element was inserted, the gscoped_ptr releases its contents. + QueueStatus Put(gscoped_ptr<T_VAL> *val) { + QueueStatus s = Put(val->get()); + if (s == QUEUE_SUCCESS) { + ignore_result<>(val->release()); + } + return s; + } + + // Gets an element for the queue; if the queue is full, blocks until + // space becomes available. Returns false if we were shutdown prior + // to enqueueing the element. + bool BlockingPut(const T& val) { + MutexLock l(lock_); + while (true) { + if (shutdown_) { + return false; + } + if (size_ < max_size_) { + list_.push_back(val); + increment_size_unlocked(val); + l.Unlock(); + not_empty_.Signal(); + return true; + } + not_full_.Wait(); + } + } + + // Same as other BlockingPut() overload above. If the element was + // enqueued, gscoped_ptr releases its contents. + bool BlockingPut(gscoped_ptr<T_VAL>* val) { + bool ret = Put(val->get()); + if (ret) { + ignore_result(val->release()); + } + return ret; + } + + // Shut down the queue. + // When a blocking queue is shut down, no more elements can be added to it, + // and Put() will return QUEUE_SHUTDOWN. + // Existing elements will drain out of it, and then BlockingGet will start + // returning false. + void Shutdown() { + MutexLock l(lock_); + shutdown_ = true; + not_full_.Broadcast(); + not_empty_.Broadcast(); + } + + bool empty() const { + MutexLock l(lock_); + return list_.empty(); + } + + size_t max_size() const { + return max_size_; + } + + std::string ToString() const { + std::string ret; + + MutexLock l(lock_); + for (const T& t : list_) { + ret.append(t->ToString()); + ret.append("\n"); + } + return ret; + } + + private: + + // Increments queue size. Must be called when 'lock_' is held. + void increment_size_unlocked(const T& t) { + size_ += LOGICAL_SIZE::logical_size(t); + } + + // Decrements queue size. Must be called when 'lock_' is held. + void decrement_size_unlocked(const T& t) { + size_ -= LOGICAL_SIZE::logical_size(t); + } + + bool shutdown_; + size_t size_; + size_t max_size_; + mutable Mutex lock_; + ConditionVariable not_empty_; + ConditionVariable not_full_; + std::list<T> list_; +}; + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bloom_filter-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/bloom_filter-test.cc b/be/src/kudu/util/bloom_filter-test.cc new file mode 100644 index 0000000..3cee985 --- /dev/null +++ b/be/src/kudu/util/bloom_filter-test.cc @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <glog/logging.h> +#include <gtest/gtest.h> +#include <stdlib.h> +#include "kudu/util/bloom_filter.h" + +namespace kudu { + +static const int kRandomSeed = 0xdeadbeef; + +static void AddRandomKeys(int random_seed, int n_keys, BloomFilterBuilder *bf) { + srandom(random_seed); + for (int i = 0; i < n_keys; i++) { + uint64_t key = random(); + Slice key_slice(reinterpret_cast<const uint8_t *>(&key), sizeof(key)); + BloomKeyProbe probe(key_slice); + bf->AddKey(probe); + } +} + +static void CheckRandomKeys(int random_seed, int n_keys, const BloomFilter &bf) { + srandom(random_seed); + for (int i = 0; i < n_keys; i++) { + uint64_t key = random(); + Slice key_slice(reinterpret_cast<const uint8_t *>(&key), sizeof(key)); + BloomKeyProbe probe(key_slice); + ASSERT_TRUE(bf.MayContainKey(probe)); + } +} + +TEST(TestBloomFilter, TestInsertAndProbe) { + int n_keys = 2000; + BloomFilterBuilder bfb( + BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01)); + + // Check that the desired false positive rate is achieved. + double expected_fp_rate = bfb.false_positive_rate(); + ASSERT_NEAR(expected_fp_rate, 0.01, 0.002); + + // 1% FP rate should need about 9 bits per key + ASSERT_EQ(9, bfb.n_bits() / n_keys); + + // Enter n_keys random keys into the bloom filter + AddRandomKeys(kRandomSeed, n_keys, &bfb); + + // Verify that the keys we inserted all return true when queried. + BloomFilter bf(bfb.slice(), bfb.n_hashes()); + CheckRandomKeys(kRandomSeed, n_keys, bf); + + // Query a bunch of other keys, and verify the false positive rate + // is within reasonable bounds. + uint32_t num_queries = 100000; + uint32_t num_positives = 0; + for (int i = 0; i < num_queries; i++) { + uint64_t key = random(); + Slice key_slice(reinterpret_cast<const uint8_t *>(&key), sizeof(key)); + BloomKeyProbe probe(key_slice); + if (bf.MayContainKey(probe)) { + num_positives++; + } + } + + double fp_rate = static_cast<double>(num_positives) / static_cast<double>(num_queries); + LOG(INFO) << "FP rate: " << fp_rate << " (" << num_positives << "/" << num_queries << ")"; + LOG(INFO) << "Expected FP rate: " << expected_fp_rate; + + // Actual FP rate should be within 20% of the estimated FP rate + ASSERT_NEAR(fp_rate, expected_fp_rate, 0.20*expected_fp_rate); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bloom_filter.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/bloom_filter.cc b/be/src/kudu/util/bloom_filter.cc new file mode 100644 index 0000000..2b48b0d --- /dev/null +++ b/be/src/kudu/util/bloom_filter.cc @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <math.h> + +#include "kudu/util/bloom_filter.h" +#include "kudu/util/bitmap.h" + +namespace kudu { + +static double kNaturalLog2 = 0.69314; + +static int ComputeOptimalHashCount(size_t n_bits, size_t elems) { + int n_hashes = n_bits * kNaturalLog2 / elems; + if (n_hashes < 1) n_hashes = 1; + return n_hashes; +} + +BloomFilterSizing BloomFilterSizing::ByCountAndFPRate( + size_t expected_count, double fp_rate) { + CHECK_GT(fp_rate, 0); + CHECK_LT(fp_rate, 1); + + double n_bits = -static_cast<double>(expected_count) * log(fp_rate) + / kNaturalLog2 / kNaturalLog2; + int n_bytes = static_cast<int>(ceil(n_bits / 8)); + CHECK_GT(n_bytes, 0) + << "expected_count: " << expected_count + << " fp_rate: " << fp_rate; + return BloomFilterSizing(n_bytes, expected_count); +} + +BloomFilterSizing BloomFilterSizing::BySizeAndFPRate(size_t n_bytes, double fp_rate) { + size_t n_bits = n_bytes * 8; + double expected_elems = -static_cast<double>(n_bits) * kNaturalLog2 * kNaturalLog2 / + log(fp_rate); + DCHECK_GT(expected_elems, 1); + return BloomFilterSizing(n_bytes, (size_t)ceil(expected_elems)); +} + + +BloomFilterBuilder::BloomFilterBuilder(const BloomFilterSizing &sizing) + : n_bits_(sizing.n_bytes() * 8), + bitmap_(new uint8_t[sizing.n_bytes()]), + n_hashes_(ComputeOptimalHashCount(n_bits_, sizing.expected_count())), + expected_count_(sizing.expected_count()), + n_inserted_(0) { + Clear(); +} + +void BloomFilterBuilder::Clear() { + memset(&bitmap_[0], 0, n_bytes()); + n_inserted_ = 0; +} + +double BloomFilterBuilder::false_positive_rate() const { + CHECK_NE(expected_count_, 0) + << "expected_count_ not initialized: can't call this function on " + << "a BloomFilter initialized from external data"; + + return pow(1 - exp(-static_cast<double>(n_hashes_) * expected_count_ / n_bits_), n_hashes_); +} + +BloomFilter::BloomFilter(const Slice &data, size_t n_hashes) + : n_bits_(data.size() * 8), + bitmap_(reinterpret_cast<const uint8_t *>(data.data())), + n_hashes_(n_hashes) +{} + + + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bloom_filter.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/bloom_filter.h b/be/src/kudu/util/bloom_filter.h new file mode 100644 index 0000000..419de3b --- /dev/null +++ b/be/src/kudu/util/bloom_filter.h @@ -0,0 +1,249 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_BLOOM_FILTER_H +#define KUDU_UTIL_BLOOM_FILTER_H + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/hash/city.h" +#include "kudu/gutil/macros.h" +#include "kudu/util/bitmap.h" +#include "kudu/util/slice.h" + +namespace kudu { + +// Probe calculated from a given key. This caches the calculated +// hash values which are necessary for probing into a Bloom Filter, +// so that when many bloom filters have to be consulted for a given +// key, we only need to calculate the hashes once. +// +// This is implemented based on the idea of double-hashing from the following paper: +// "Less Hashing, Same Performance: Building a Better Bloom Filter" +// Kirsch and Mitzenmacher, ESA 2006 +// https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf +// +// Currently, the implementation uses the 64-bit City Hash. +// TODO: an SSE CRC32 hash is probably ~20% faster. Come back to this +// at some point. +class BloomKeyProbe { + public: + // Default constructor - this is only used to instantiate an object + // and later reassign by assignment from another instance + BloomKeyProbe() {} + + // Construct a probe from the given key. + // + // NOTE: proper operation requires that the referenced memory remain + // valid for the lifetime of this object. + explicit BloomKeyProbe(const Slice &key) : key_(key) { + uint64_t h = util_hash::CityHash64( + reinterpret_cast<const char *>(key.data()), + key.size()); + + // Use the top and bottom halves of the 64-bit hash + // as the two independent hash functions for mixing. + h_1_ = static_cast<uint32>(h); + h_2_ = static_cast<uint32>(h >> 32); + } + + const Slice &key() const { return key_; } + + // The initial hash value. See MixHash() for usage example. + uint32_t initial_hash() const { + return h_1_; + } + + // Mix the given hash function with the second calculated hash + // value. A sequence of independent hashes can be calculated + // by repeatedly calling MixHash() on its previous result. + uint32_t MixHash(uint32_t h) const { + return h + h_2_; + } + + private: + Slice key_; + + // The two hashes. + uint32_t h_1_; + uint32_t h_2_; +}; + +// Sizing parameters for the constructor to BloomFilterBuilder. +// This is simply to provide a nicer API than a bunch of overloaded +// constructors. +class BloomFilterSizing { + public: + // Size the bloom filter by a fixed size and false positive rate. + // + // Picks the number of entries to achieve the above. + static BloomFilterSizing BySizeAndFPRate(size_t n_bytes, double fp_rate); + + // Size the bloom filer by an expected count and false positive rate. + // + // Picks the number of bytes to achieve the above. + static BloomFilterSizing ByCountAndFPRate(size_t expected_count, double fp_rate); + + size_t n_bytes() const { return n_bytes_; } + size_t expected_count() const { return expected_count_; } + + private: + BloomFilterSizing(size_t n_bytes, size_t expected_count) : + n_bytes_(n_bytes), + expected_count_(expected_count) + {} + + size_t n_bytes_; + size_t expected_count_; +}; + + +// Builder for a BloomFilter structure. +class BloomFilterBuilder { + public: + // Create a bloom filter. + // See BloomFilterSizing static methods to specify this argument. + explicit BloomFilterBuilder(const BloomFilterSizing &sizing); + + // Clear all entries, reset insertion count. + void Clear(); + + // Add the given key to the bloom filter. + void AddKey(const BloomKeyProbe &probe); + + // Return an estimate of the false positive rate. + double false_positive_rate() const; + + int n_bytes() const { + return n_bits_ / 8; + } + + int n_bits() const { + return n_bits_; + } + + // Return a slice view into this Bloom Filter, suitable for + // writing out to a file. + const Slice slice() const { + return Slice(&bitmap_[0], n_bytes()); + } + + // Return the number of hashes that are calculated for each entry + // in the bloom filter. + size_t n_hashes() const { return n_hashes_; } + + size_t expected_count() const { return expected_count_; } + + // Return the number of keys inserted. + size_t count() const { return n_inserted_; } + + private: + DISALLOW_COPY_AND_ASSIGN(BloomFilterBuilder); + + size_t n_bits_; + gscoped_array<uint8_t> bitmap_; + + // The number of hash functions to compute. + size_t n_hashes_; + + // The expected number of elements, for which the bloom is optimized. + size_t expected_count_; + + // The number of elements inserted so far since the last Reset. + size_t n_inserted_; +}; + + +// Wrapper around a byte array for reading it as a bloom filter. +class BloomFilter { + public: + BloomFilter() : bitmap_(nullptr) {} + BloomFilter(const Slice &data, size_t n_hashes); + + // Return true if the filter may contain the given key. + bool MayContainKey(const BloomKeyProbe &probe) const; + + private: + friend class BloomFilterBuilder; + static uint32_t PickBit(uint32_t hash, size_t n_bits); + + size_t n_bits_; + const uint8_t *bitmap_; + + size_t n_hashes_; +}; + + +//////////////////////////////////////////////////////////// +// Inline implementations +//////////////////////////////////////////////////////////// + +inline uint32_t BloomFilter::PickBit(uint32_t hash, size_t n_bits) { + switch (n_bits) { + // Fast path for the default bloom filter block size. Bitwise math + // is much faster than division. + case 4096 * 8: + return hash & (n_bits - 1); + + default: + return hash % n_bits; + } +} + +inline void BloomFilterBuilder::AddKey(const BloomKeyProbe &probe) { + uint32_t h = probe.initial_hash(); + for (size_t i = 0; i < n_hashes_; i++) { + uint32_t bitpos = BloomFilter::PickBit(h, n_bits_); + BitmapSet(&bitmap_[0], bitpos); + h = probe.MixHash(h); + } + n_inserted_++; +} + +inline bool BloomFilter::MayContainKey(const BloomKeyProbe &probe) const { + uint32_t h = probe.initial_hash(); + + // Basic unrolling by 2s gives a small benefit here since the two bit positions + // can be calculated in parallel -- it's a 50% chance that the first will be + // set even if it's a bloom miss, in which case we can parallelize the load. + int rem_hashes = n_hashes_; + while (rem_hashes >= 2) { + uint32_t bitpos1 = PickBit(h, n_bits_); + h = probe.MixHash(h); + uint32_t bitpos2 = PickBit(h, n_bits_); + h = probe.MixHash(h); + + if (!BitmapTest(&bitmap_[0], bitpos1) || + !BitmapTest(&bitmap_[0], bitpos2)) { + return false; + } + + rem_hashes -= 2; + } + + while (rem_hashes) { + uint32_t bitpos = PickBit(h, n_bits_); + if (!BitmapTest(&bitmap_[0], bitpos)) { + return false; + } + h = probe.MixHash(h); + rem_hashes--; + } + return true; +} + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/boost_mutex_utils.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/boost_mutex_utils.h b/be/src/kudu/util/boost_mutex_utils.h new file mode 100644 index 0000000..6f6390b --- /dev/null +++ b/be/src/kudu/util/boost_mutex_utils.h @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_BOOST_MUTEX_UTILS_H +#define KUDU_BOOST_MUTEX_UTILS_H + + +// Similar to std::lock_guard except that it takes +// a lock pointer, and checks against nullptr. If the +// pointer is NULL, does nothing. Otherwise guards +// with the lock. +template<class LockType> +class lock_guard_maybe { + public: + explicit lock_guard_maybe(LockType *l) : + lock_(l) { + if (l != nullptr) { + l->lock(); + } + } + + ~lock_guard_maybe() { + if (lock_ != nullptr) { + lock_->unlock(); + } + } + + private: + LockType *lock_; +}; + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/cache-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/cache-test.cc b/be/src/kudu/util/cache-test.cc new file mode 100644 index 0000000..7319cf9 --- /dev/null +++ b/be/src/kudu/util/cache-test.cc @@ -0,0 +1,234 @@ +// Some portions Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <glog/logging.h> +#include <gflags/gflags.h> +#include <gtest/gtest.h> +#include <memory> + +#include <vector> +#include "kudu/util/cache.h" +#include "kudu/util/coding.h" +#include "kudu/util/mem_tracker.h" +#include "kudu/util/metrics.h" +#include "kudu/util/test_util.h" + +#if defined(__linux__) +DECLARE_string(nvm_cache_path); +#endif // defined(__linux__) + +namespace kudu { + +// Conversions between numeric keys/values and the types expected by Cache. +static std::string EncodeInt(int k) { + faststring result; + PutFixed32(&result, k); + return result.ToString(); +} +static int DecodeInt(const Slice& k) { + assert(k.size() == 4); + return DecodeFixed32(k.data()); +} + +class CacheTest : public KuduTest, + public ::testing::WithParamInterface<CacheType>, + public Cache::EvictionCallback { + public: + + // Implementation of the EvictionCallback interface + void EvictedEntry(Slice key, Slice val) override { + evicted_keys_.push_back(DecodeInt(key)); + evicted_values_.push_back(DecodeInt(val)); + } + std::vector<int> evicted_keys_; + std::vector<int> evicted_values_; + std::shared_ptr<MemTracker> mem_tracker_; + gscoped_ptr<Cache> cache_; + MetricRegistry metric_registry_; + + static const int kCacheSize = 14*1024*1024; + + virtual void SetUp() OVERRIDE { + +#if defined(__linux__) + if (google::GetCommandLineFlagInfoOrDie("nvm_cache_path").is_default) { + FLAGS_nvm_cache_path = GetTestPath("nvm-cache"); + ASSERT_OK(Env::Default()->CreateDir(FLAGS_nvm_cache_path)); + } +#endif // defined(__linux__) + + cache_.reset(NewLRUCache(GetParam(), kCacheSize, "cache_test")); + + MemTracker::FindTracker("cache_test-sharded_lru_cache", &mem_tracker_); + // Since nvm cache does not have memtracker due to the use of + // tcmalloc for this we only check for it in the DRAM case. + if (GetParam() == DRAM_CACHE) { + ASSERT_TRUE(mem_tracker_.get()); + } + + scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate( + &metric_registry_, "test"); + cache_->SetMetrics(entity); + } + + int Lookup(int key) { + Cache::Handle* handle = cache_->Lookup(EncodeInt(key), Cache::EXPECT_IN_CACHE); + const int r = (handle == nullptr) ? -1 : DecodeInt(cache_->Value(handle)); + if (handle != nullptr) { + cache_->Release(handle); + } + return r; + } + + void Insert(int key, int value, int charge = 1) { + string key_str = EncodeInt(key); + string val_str = EncodeInt(value); + Cache::PendingHandle* handle = CHECK_NOTNULL(cache_->Allocate(key_str, val_str.size(), charge)); + memcpy(cache_->MutableValue(handle), val_str.data(), val_str.size()); + + cache_->Release(cache_->Insert(handle, this)); + } + + void Erase(int key) { + cache_->Erase(EncodeInt(key)); + } +}; + +#if defined(__linux__) +INSTANTIATE_TEST_CASE_P(CacheTypes, CacheTest, ::testing::Values(DRAM_CACHE, NVM_CACHE)); +#else +INSTANTIATE_TEST_CASE_P(CacheTypes, CacheTest, ::testing::Values(DRAM_CACHE)); +#endif // defined(__linux__) + +TEST_P(CacheTest, TrackMemory) { + if (mem_tracker_) { + Insert(100, 100, 1); + ASSERT_EQ(1, mem_tracker_->consumption()); + Erase(100); + ASSERT_EQ(0, mem_tracker_->consumption()); + ASSERT_EQ(1, mem_tracker_->peak_consumption()); + } +} + +TEST_P(CacheTest, HitAndMiss) { + ASSERT_EQ(-1, Lookup(100)); + + Insert(100, 101); + ASSERT_EQ(101, Lookup(100)); + ASSERT_EQ(-1, Lookup(200)); + ASSERT_EQ(-1, Lookup(300)); + + Insert(200, 201); + ASSERT_EQ(101, Lookup(100)); + ASSERT_EQ(201, Lookup(200)); + ASSERT_EQ(-1, Lookup(300)); + + Insert(100, 102); + ASSERT_EQ(102, Lookup(100)); + ASSERT_EQ(201, Lookup(200)); + ASSERT_EQ(-1, Lookup(300)); + + ASSERT_EQ(1, evicted_keys_.size()); + ASSERT_EQ(100, evicted_keys_[0]); + ASSERT_EQ(101, evicted_values_[0]); +} + +TEST_P(CacheTest, Erase) { + Erase(200); + ASSERT_EQ(0, evicted_keys_.size()); + + Insert(100, 101); + Insert(200, 201); + Erase(100); + ASSERT_EQ(-1, Lookup(100)); + ASSERT_EQ(201, Lookup(200)); + ASSERT_EQ(1, evicted_keys_.size()); + ASSERT_EQ(100, evicted_keys_[0]); + ASSERT_EQ(101, evicted_values_[0]); + + Erase(100); + ASSERT_EQ(-1, Lookup(100)); + ASSERT_EQ(201, Lookup(200)); + ASSERT_EQ(1, evicted_keys_.size()); +} + +TEST_P(CacheTest, EntriesArePinned) { + Insert(100, 101); + Cache::Handle* h1 = cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE); + ASSERT_EQ(101, DecodeInt(cache_->Value(h1))); + + Insert(100, 102); + Cache::Handle* h2 = cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE); + ASSERT_EQ(102, DecodeInt(cache_->Value(h2))); + ASSERT_EQ(0, evicted_keys_.size()); + + cache_->Release(h1); + ASSERT_EQ(1, evicted_keys_.size()); + ASSERT_EQ(100, evicted_keys_[0]); + ASSERT_EQ(101, evicted_values_[0]); + + Erase(100); + ASSERT_EQ(-1, Lookup(100)); + ASSERT_EQ(1, evicted_keys_.size()); + + cache_->Release(h2); + ASSERT_EQ(2, evicted_keys_.size()); + ASSERT_EQ(100, evicted_keys_[1]); + ASSERT_EQ(102, evicted_values_[1]); +} + +TEST_P(CacheTest, EvictionPolicy) { + Insert(100, 101); + Insert(200, 201); + + const int kNumElems = 1000; + const int kSizePerElem = kCacheSize / kNumElems; + + // Loop adding and looking up new entries, but repeatedly accessing key 101. This + // frequently-used entry should not be evicted. + for (int i = 0; i < kNumElems + 1000; i++) { + Insert(1000+i, 2000+i, kSizePerElem); + ASSERT_EQ(2000+i, Lookup(1000+i)); + ASSERT_EQ(101, Lookup(100)); + } + ASSERT_EQ(101, Lookup(100)); + // Since '200' wasn't accessed in the loop above, it should have + // been evicted. + ASSERT_EQ(-1, Lookup(200)); +} + +TEST_P(CacheTest, HeavyEntries) { + // Add a bunch of light and heavy entries and then count the combined + // size of items still in the cache, which must be approximately the + // same as the total capacity. + const int kLight = kCacheSize/1000; + const int kHeavy = kCacheSize/100; + int added = 0; + int index = 0; + while (added < 2*kCacheSize) { + const int weight = (index & 1) ? kLight : kHeavy; + Insert(index, 1000+index, weight); + added += weight; + index++; + } + + int cached_weight = 0; + for (int i = 0; i < index; i++) { + const int weight = (i & 1 ? kLight : kHeavy); + int r = Lookup(i); + if (r >= 0) { + cached_weight += weight; + ASSERT_EQ(1000+i, r); + } + } + ASSERT_LE(cached_weight, kCacheSize + kCacheSize/10); +} + +TEST_P(CacheTest, NewId) { + uint64_t a = cache_->NewId(); + uint64_t b = cache_->NewId(); + ASSERT_NE(a, b); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/cache.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/cache.cc b/be/src/kudu/util/cache.cc new file mode 100644 index 0000000..4700fb3 --- /dev/null +++ b/be/src/kudu/util/cache.cc @@ -0,0 +1,512 @@ +// Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <cstdlib> +#include <memory> +#include <mutex> +#include <string> +#include <vector> + +#include <gflags/gflags.h> +#include <glog/logging.h> + +#include "kudu/gutil/atomic_refcount.h" +#include "kudu/gutil/bits.h" +#include "kudu/gutil/hash/city.h" +#include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/sysinfo.h" +#include "kudu/util/alignment.h" +#include "kudu/util/atomic.h" +#include "kudu/util/cache.h" +#include "kudu/util/cache_metrics.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/locks.h" +#include "kudu/util/mem_tracker.h" +#include "kudu/util/metrics.h" + +#if !defined(__APPLE__) +#include "kudu/util/nvm_cache.h" +#endif + +// Useful in tests that require accurate cache capacity accounting. +DEFINE_bool(cache_force_single_shard, false, + "Override all cache implementations to use just one shard"); +TAG_FLAG(cache_force_single_shard, hidden); + +namespace kudu { + +class MetricEntity; + +Cache::~Cache() { +} + +namespace { + +using std::shared_ptr; +using std::vector; + +typedef simple_spinlock MutexType; + +// LRU cache implementation + +// An entry is a variable length heap-allocated structure. Entries +// are kept in a circular doubly linked list ordered by access time. +struct LRUHandle { + Cache::EvictionCallback* eviction_callback; + LRUHandle* next_hash; + LRUHandle* next; + LRUHandle* prev; + size_t charge; // TODO(opt): Only allow uint32_t? + uint32_t key_length; + uint32_t val_length; + Atomic32 refs; + uint32_t hash; // Hash of key(); used for fast sharding and comparisons + + // The storage for the key/value pair itself. The data is stored as: + // [key bytes ...] [padding up to 8-byte boundary] [value bytes ...] + uint8_t kv_data[1]; // Beginning of key/value pair + + Slice key() const { + return Slice(kv_data, key_length); + } + + uint8_t* mutable_val_ptr() { + int val_offset = KUDU_ALIGN_UP(key_length, sizeof(void*)); + return &kv_data[val_offset]; + } + + const uint8_t* val_ptr() const { + return const_cast<LRUHandle*>(this)->mutable_val_ptr(); + } + + Slice value() const { + return Slice(val_ptr(), val_length); + } +}; + +// We provide our own simple hash table since it removes a whole bunch +// of porting hacks and is also faster than some of the built-in hash +// table implementations in some of the compiler/runtime combinations +// we have tested. E.g., readrandom speeds up by ~5% over the g++ +// 4.4.3's builtin hashtable. +class HandleTable { + public: + HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); } + ~HandleTable() { delete[] list_; } + + LRUHandle* Lookup(const Slice& key, uint32_t hash) { + return *FindPointer(key, hash); + } + + LRUHandle* Insert(LRUHandle* h) { + LRUHandle** ptr = FindPointer(h->key(), h->hash); + LRUHandle* old = *ptr; + h->next_hash = (old == nullptr ? nullptr : old->next_hash); + *ptr = h; + if (old == nullptr) { + ++elems_; + if (elems_ > length_) { + // Since each cache entry is fairly large, we aim for a small + // average linked list length (<= 1). + Resize(); + } + } + return old; + } + + LRUHandle* Remove(const Slice& key, uint32_t hash) { + LRUHandle** ptr = FindPointer(key, hash); + LRUHandle* result = *ptr; + if (result != nullptr) { + *ptr = result->next_hash; + --elems_; + } + return result; + } + + private: + // The table consists of an array of buckets where each bucket is + // a linked list of cache entries that hash into the bucket. + uint32_t length_; + uint32_t elems_; + LRUHandle** list_; + + // Return a pointer to slot that points to a cache entry that + // matches key/hash. If there is no such cache entry, return a + // pointer to the trailing slot in the corresponding linked list. + LRUHandle** FindPointer(const Slice& key, uint32_t hash) { + LRUHandle** ptr = &list_[hash & (length_ - 1)]; + while (*ptr != nullptr && + ((*ptr)->hash != hash || key != (*ptr)->key())) { + ptr = &(*ptr)->next_hash; + } + return ptr; + } + + void Resize() { + uint32_t new_length = 16; + while (new_length < elems_ * 1.5) { + new_length *= 2; + } + auto new_list = new LRUHandle*[new_length]; + memset(new_list, 0, sizeof(new_list[0]) * new_length); + uint32_t count = 0; + for (uint32_t i = 0; i < length_; i++) { + LRUHandle* h = list_[i]; + while (h != nullptr) { + LRUHandle* next = h->next_hash; + uint32_t hash = h->hash; + LRUHandle** ptr = &new_list[hash & (new_length - 1)]; + h->next_hash = *ptr; + *ptr = h; + h = next; + count++; + } + } + DCHECK_EQ(elems_, count); + delete[] list_; + list_ = new_list; + length_ = new_length; + } +}; + +// A single shard of sharded cache. +class LRUCache { + public: + explicit LRUCache(MemTracker* tracker); + ~LRUCache(); + + // Separate from constructor so caller can easily make an array of LRUCache + void SetCapacity(size_t capacity) { capacity_ = capacity; } + + void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; } + + Cache::Handle* Insert(LRUHandle* handle, Cache::EvictionCallback* eviction_callback); + // Like Cache::Lookup, but with an extra "hash" parameter. + Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching); + void Release(Cache::Handle* handle); + void Erase(const Slice& key, uint32_t hash); + + private: + void LRU_Remove(LRUHandle* e); + void LRU_Append(LRUHandle* e); + // Just reduce the reference count by 1. + // Return true if last reference + bool Unref(LRUHandle* e); + // Call the user's eviction callback, if it exists, and free the entry. + void FreeEntry(LRUHandle* e); + + // Initialized before use. + size_t capacity_; + + // mutex_ protects the following state. + MutexType mutex_; + size_t usage_; + + // Dummy head of LRU list. + // lru.prev is newest entry, lru.next is oldest entry. + LRUHandle lru_; + + HandleTable table_; + + MemTracker* mem_tracker_; + + CacheMetrics* metrics_; +}; + +LRUCache::LRUCache(MemTracker* tracker) + : usage_(0), + mem_tracker_(tracker), + metrics_(nullptr) { + // Make empty circular linked list + lru_.next = &lru_; + lru_.prev = &lru_; +} + +LRUCache::~LRUCache() { + for (LRUHandle* e = lru_.next; e != &lru_; ) { + LRUHandle* next = e->next; + DCHECK_EQ(e->refs, 1); // Error if caller has an unreleased handle + if (Unref(e)) { + FreeEntry(e); + } + e = next; + } +} + +bool LRUCache::Unref(LRUHandle* e) { + DCHECK_GT(ANNOTATE_UNPROTECTED_READ(e->refs), 0); + return !base::RefCountDec(&e->refs); +} + +void LRUCache::FreeEntry(LRUHandle* e) { + DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(e->refs), 0); + if (e->eviction_callback) { + e->eviction_callback->EvictedEntry(e->key(), e->value()); + } + mem_tracker_->Release(e->charge); + if (PREDICT_TRUE(metrics_)) { + metrics_->cache_usage->DecrementBy(e->charge); + metrics_->evictions->Increment(); + } + delete [] e; +} + +void LRUCache::LRU_Remove(LRUHandle* e) { + e->next->prev = e->prev; + e->prev->next = e->next; + usage_ -= e->charge; +} + +void LRUCache::LRU_Append(LRUHandle* e) { + // Make "e" newest entry by inserting just before lru_ + e->next = &lru_; + e->prev = lru_.prev; + e->prev->next = e; + e->next->prev = e; + usage_ += e->charge; +} + +Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash, bool caching) { + LRUHandle* e; + { + std::lock_guard<MutexType> l(mutex_); + e = table_.Lookup(key, hash); + if (e != nullptr) { + base::RefCountInc(&e->refs); + LRU_Remove(e); + LRU_Append(e); + } + } + + // Do the metrics outside of the lock. + if (metrics_) { + metrics_->lookups->Increment(); + bool was_hit = (e != nullptr); + if (was_hit) { + if (caching) { + metrics_->cache_hits_caching->Increment(); + } else { + metrics_->cache_hits->Increment(); + } + } else { + if (caching) { + metrics_->cache_misses_caching->Increment(); + } else { + metrics_->cache_misses->Increment(); + } + } + } + + return reinterpret_cast<Cache::Handle*>(e); +} + +void LRUCache::Release(Cache::Handle* handle) { + LRUHandle* e = reinterpret_cast<LRUHandle*>(handle); + bool last_reference = Unref(e); + if (last_reference) { + FreeEntry(e); + } +} + +Cache::Handle* LRUCache::Insert(LRUHandle* e, Cache::EvictionCallback *eviction_callback) { + + // Set the remaining LRUHandle members which were not already allocated during + // Allocate(). + e->eviction_callback = eviction_callback; + e->refs = 2; // One from LRUCache, one for the returned handle + mem_tracker_->Consume(e->charge); + if (PREDICT_TRUE(metrics_)) { + metrics_->cache_usage->IncrementBy(e->charge); + metrics_->inserts->Increment(); + } + + LRUHandle* to_remove_head = nullptr; + { + std::lock_guard<MutexType> l(mutex_); + + LRU_Append(e); + + LRUHandle* old = table_.Insert(e); + if (old != nullptr) { + LRU_Remove(old); + if (Unref(old)) { + old->next = to_remove_head; + to_remove_head = old; + } + } + + while (usage_ > capacity_ && lru_.next != &lru_) { + LRUHandle* old = lru_.next; + LRU_Remove(old); + table_.Remove(old->key(), old->hash); + if (Unref(old)) { + old->next = to_remove_head; + to_remove_head = old; + } + } + } + + // we free the entries here outside of mutex for + // performance reasons + while (to_remove_head != nullptr) { + LRUHandle* next = to_remove_head->next; + FreeEntry(to_remove_head); + to_remove_head = next; + } + + return reinterpret_cast<Cache::Handle*>(e); +} + +void LRUCache::Erase(const Slice& key, uint32_t hash) { + LRUHandle* e; + bool last_reference = false; + { + std::lock_guard<MutexType> l(mutex_); + e = table_.Remove(key, hash); + if (e != nullptr) { + LRU_Remove(e); + last_reference = Unref(e); + } + } + // mutex not held here + // last_reference will only be true if e != NULL + if (last_reference) { + FreeEntry(e); + } +} + +// Determine the number of bits of the hash that should be used to determine +// the cache shard. This, in turn, determines the number of shards. +int DetermineShardBits() { + int bits = PREDICT_FALSE(FLAGS_cache_force_single_shard) ? + 0 : Bits::Log2Ceiling(base::NumCPUs()); + VLOG(1) << "Will use " << (1 << bits) << " shards for LRU cache."; + return bits; +} + +class ShardedLRUCache : public Cache { + private: + shared_ptr<MemTracker> mem_tracker_; + gscoped_ptr<CacheMetrics> metrics_; + vector<LRUCache*> shards_; + MutexType id_mutex_; + uint64_t last_id_; + + // Number of bits of hash used to determine the shard. + const int shard_bits_; + + static inline uint32_t HashSlice(const Slice& s) { + return util_hash::CityHash64( + reinterpret_cast<const char *>(s.data()), s.size()); + } + + uint32_t Shard(uint32_t hash) { + // Widen to uint64 before shifting, or else on a single CPU, + // we would try to shift a uint32_t by 32 bits, which is undefined. + return static_cast<uint64_t>(hash) >> (32 - shard_bits_); + } + + public: + explicit ShardedLRUCache(size_t capacity, const string& id) + : last_id_(0), + shard_bits_(DetermineShardBits()) { + // A cache is often a singleton, so: + // 1. We reuse its MemTracker if one already exists, and + // 2. It is directly parented to the root MemTracker. + mem_tracker_ = MemTracker::FindOrCreateGlobalTracker( + -1, strings::Substitute("$0-sharded_lru_cache", id)); + + int num_shards = 1 << shard_bits_; + const size_t per_shard = (capacity + (num_shards - 1)) / num_shards; + for (int s = 0; s < num_shards; s++) { + gscoped_ptr<LRUCache> shard(new LRUCache(mem_tracker_.get())); + shard->SetCapacity(per_shard); + shards_.push_back(shard.release()); + } + } + + virtual ~ShardedLRUCache() { + STLDeleteElements(&shards_); + } + + virtual Handle* Insert(PendingHandle* handle, + Cache::EvictionCallback* eviction_callback) OVERRIDE { + LRUHandle* h = reinterpret_cast<LRUHandle*>(DCHECK_NOTNULL(handle)); + return shards_[Shard(h->hash)]->Insert(h, eviction_callback); + } + virtual Handle* Lookup(const Slice& key, CacheBehavior caching) OVERRIDE { + const uint32_t hash = HashSlice(key); + return shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE); + } + virtual void Release(Handle* handle) OVERRIDE { + LRUHandle* h = reinterpret_cast<LRUHandle*>(handle); + shards_[Shard(h->hash)]->Release(handle); + } + virtual void Erase(const Slice& key) OVERRIDE { + const uint32_t hash = HashSlice(key); + shards_[Shard(hash)]->Erase(key, hash); + } + virtual Slice Value(Handle* handle) OVERRIDE { + return reinterpret_cast<LRUHandle*>(handle)->value(); + } + virtual uint64_t NewId() OVERRIDE { + std::lock_guard<MutexType> l(id_mutex_); + return ++(last_id_); + } + + virtual void SetMetrics(const scoped_refptr<MetricEntity>& entity) OVERRIDE { + metrics_.reset(new CacheMetrics(entity)); + for (LRUCache* cache : shards_) { + cache->SetMetrics(metrics_.get()); + } + } + + virtual PendingHandle* Allocate(Slice key, int val_len, int charge) OVERRIDE { + int key_len = key.size(); + DCHECK_GE(key_len, 0); + DCHECK_GE(val_len, 0); + int key_len_padded = KUDU_ALIGN_UP(key_len, sizeof(void*)); + uint8_t* buf = new uint8_t[sizeof(LRUHandle) + + key_len_padded + val_len // the kv_data VLA data + - 1 // (the VLA has a 1-byte placeholder) + ]; + LRUHandle* handle = reinterpret_cast<LRUHandle*>(buf); + handle->key_length = key_len; + handle->val_length = val_len; + handle->charge = charge; + handle->hash = HashSlice(key); + memcpy(handle->kv_data, key.data(), key_len); + + return reinterpret_cast<PendingHandle*>(handle); + } + + virtual void Free(PendingHandle* h) OVERRIDE { + uint8_t* data = reinterpret_cast<uint8_t*>(h); + delete [] data; + } + + virtual uint8_t* MutableValue(PendingHandle* h) OVERRIDE { + return reinterpret_cast<LRUHandle*>(h)->mutable_val_ptr(); + } + +}; + +} // end anonymous namespace + +Cache* NewLRUCache(CacheType type, size_t capacity, const string& id) { + switch (type) { + case DRAM_CACHE: + return new ShardedLRUCache(capacity, id); +#if !defined(__APPLE__) + case NVM_CACHE: + return NewLRUNvmCache(capacity, id); +#endif + default: + LOG(FATAL) << "Unsupported LRU cache type: " << type; + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/cache.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/cache.h b/be/src/kudu/util/cache.h new file mode 100644 index 0000000..af0a9ae --- /dev/null +++ b/be/src/kudu/util/cache.h @@ -0,0 +1,207 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// A Cache is an interface that maps keys to values. It has internal +// synchronization and may be safely accessed concurrently from +// multiple threads. It may automatically evict entries to make room +// for new entries. Values have a specified charge against the cache +// capacity. For example, a cache where the values are variable +// length strings, may use the length of the string as the charge for +// the string. +// +// This is taken from LevelDB and evolved to fit the kudu codebase. +// +// TODO: this is pretty lock-heavy. Would be good to sub out something +// a little more concurrent. + +#ifndef KUDU_UTIL_CACHE_H_ +#define KUDU_UTIL_CACHE_H_ + +#include <cstdint> +#include <memory> +#include <string> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/slice.h" + +namespace kudu { + +class Cache; +struct CacheMetrics; +class MetricEntity; + +enum CacheType { + DRAM_CACHE, + NVM_CACHE +}; + +// Create a new cache with a fixed size capacity. This implementation +// of Cache uses a least-recently-used eviction policy. +Cache* NewLRUCache(CacheType type, size_t capacity, const std::string& id); + +class Cache { + public: + // Callback interface which is called when an entry is evicted from the + // cache. + class EvictionCallback { + public: + virtual void EvictedEntry(Slice key, Slice value) = 0; + virtual ~EvictionCallback() {} + }; + + Cache() { } + + // Destroys all existing entries by calling the "deleter" + // function that was passed to the constructor. + virtual ~Cache(); + + // Opaque handle to an entry stored in the cache. + struct Handle { }; + + // Custom handle "deleter", primarily intended for use with std::unique_ptr. + // + // Sample usage: + // + // Cache* cache = NewLRUCache(...); + // ... + // { + // unique_ptr<Cache::Handle, Cache::HandleDeleter> h( + // cache->Lookup(...), Cache::HandleDeleter(cache)); + // ... + // } // 'h' is automatically released here + // + // Or: + // + // Cache* cache = NewLRUCache(...); + // ... + // { + // Cache::UniqueHandle h(cache->Lookup(...), Cache::HandleDeleter(cache)); + // ... + // } // 'h' is automatically released here + // + class HandleDeleter { + public: + explicit HandleDeleter(Cache* c) + : c_(c) { + } + + void operator()(Cache::Handle* h) const { + c_->Release(h); + } + + private: + Cache* c_; + }; + typedef std::unique_ptr<Handle, HandleDeleter> UniqueHandle; + + // Passing EXPECT_IN_CACHE will increment the hit/miss metrics that track the number of times + // blocks were requested that the users were hoping to get the block from the cache, along with + // with the basic metrics. + // Passing NO_EXPECT_IN_CACHE will only increment the basic metrics. + // This helps in determining if we are effectively caching the blocks that matter the most. + enum CacheBehavior { + EXPECT_IN_CACHE, + NO_EXPECT_IN_CACHE + }; + + // If the cache has no mapping for "key", returns NULL. + // + // Else return a handle that corresponds to the mapping. The caller + // must call this->Release(handle) when the returned mapping is no + // longer needed. + virtual Handle* Lookup(const Slice& key, CacheBehavior caching) = 0; + + // Release a mapping returned by a previous Lookup(). + // REQUIRES: handle must not have been released yet. + // REQUIRES: handle must have been returned by a method on *this. + virtual void Release(Handle* handle) = 0; + + // Return the value encapsulated in a handle returned by a + // successful Lookup(). + // REQUIRES: handle must not have been released yet. + // REQUIRES: handle must have been returned by a method on *this. + virtual Slice Value(Handle* handle) = 0; + + // If the cache contains entry for key, erase it. Note that the + // underlying entry will be kept around until all existing handles + // to it have been released. + virtual void Erase(const Slice& key) = 0; + + // Return a new numeric id. May be used by multiple clients who are + // sharing the same cache to partition the key space. Typically the + // client will allocate a new id at startup and prepend the id to + // its cache keys. + virtual uint64_t NewId() = 0; + + // Pass a metric entity in order to start recoding metrics. + virtual void SetMetrics(const scoped_refptr<MetricEntity>& metric_entity) = 0; + + // ------------------------------------------------------------ + // Insertion path + // ------------------------------------------------------------ + // + // Because some cache implementations (eg NVM) manage their own memory, and because we'd + // like to read blocks directly into cache-managed memory rather than causing an extra + // memcpy, the insertion of a new element into the cache requires two phases. First, a + // PendingHandle is allocated with space for the value, and then it is later inserted. + // + // For example: + // + // PendingHandle* ph = cache_->Allocate("my entry", value_size, charge); + // if (!ReadDataFromDisk(cache_->MutableValue(ph)).ok()) { + // cache_->Free(ph); + // ... error handling ... + // return; + // } + // Handle* h = cache_->Insert(ph, my_eviction_callback); + // ... + // cache_->Release(h); + + // Opaque handle to an entry which is being prepared to be added to + // the cache. + struct PendingHandle { }; + + // Allocate space for a new entry to be inserted into the cache. + // + // The provided 'key' is copied into the resulting handle object. + // The allocated handle has enough space such that the value can + // be written into cache_->MutableValue(handle). + // + // Note that this does not mutate the cache itself: lookups will + // not be able to find the provided key until it is inserted. + // + // It is possible that this will return NULL if the cache is above its capacity + // and eviction fails to free up enough space for the requested allocation. + // + // NOTE: the returned memory is not automatically freed by the cache: the + // caller must either free it using Free(), or insert it using Insert(). + virtual PendingHandle* Allocate(Slice key, int val_len, int charge) = 0; + + virtual uint8_t* MutableValue(PendingHandle* handle) = 0; + + // Commit a prepared entry into the cache. + // + // Returns a handle that corresponds to the mapping. The caller + // must call this->Release(handle) when the returned mapping is no + // longer needed. This method always succeeds and returns a non-null + // entry, since the space was reserved above. + // + // The 'pending' entry passed here should have been allocated using + // Cache::Allocate() above. + // + // If 'eviction_callback' is non-NULL, then it will be called when the + // entry is later evicted or when the cache shuts down. + virtual Handle* Insert(PendingHandle* pending, EvictionCallback* eviction_callback) = 0; + + // Free 'ptr', which must have been previously allocated using 'Allocate'. + virtual void Free(PendingHandle* ptr) = 0; + + private: + DISALLOW_COPY_AND_ASSIGN(Cache); +}; + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/cache_metrics.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/cache_metrics.cc b/be/src/kudu/util/cache_metrics.cc new file mode 100644 index 0000000..ac2fadf --- /dev/null +++ b/be/src/kudu/util/cache_metrics.cc @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/cache_metrics.h" + +#include "kudu/util/metrics.h" + +METRIC_DEFINE_counter(server, block_cache_inserts, + "Block Cache Inserts", kudu::MetricUnit::kBlocks, + "Number of blocks inserted in the cache"); +METRIC_DEFINE_counter(server, block_cache_lookups, + "Block Cache Lookups", kudu::MetricUnit::kBlocks, + "Number of blocks looked up from the cache"); +METRIC_DEFINE_counter(server, block_cache_evictions, + "Block Cache Evictions", kudu::MetricUnit::kBlocks, + "Number of blocks evicted from the cache"); +METRIC_DEFINE_counter(server, block_cache_misses, + "Block Cache Misses", kudu::MetricUnit::kBlocks, + "Number of lookups that didn't yield a block"); +METRIC_DEFINE_counter(server, block_cache_misses_caching, + "Block Cache Misses (Caching)", kudu::MetricUnit::kBlocks, + "Number of lookups that were expecting a block that didn't yield one." + "Use this number instead of cache_misses when trying to determine how " + "efficient the cache is"); +METRIC_DEFINE_counter(server, block_cache_hits, + "Block Cache Hits", kudu::MetricUnit::kBlocks, + "Number of lookups that found a block"); +METRIC_DEFINE_counter(server, block_cache_hits_caching, + "Block Cache Hits (Caching)", kudu::MetricUnit::kBlocks, + "Number of lookups that were expecting a block that found one." + "Use this number instead of cache_hits when trying to determine how " + "efficient the cache is"); + +METRIC_DEFINE_gauge_uint64(server, block_cache_usage, "Block Cache Memory Usage", + kudu::MetricUnit::kBytes, + "Memory consumed by the block cache"); + +namespace kudu { + +#define MINIT(member, x) member(METRIC_##x.Instantiate(entity)) +#define GINIT(member, x) member(METRIC_##x.Instantiate(entity, 0)) +CacheMetrics::CacheMetrics(const scoped_refptr<MetricEntity>& entity) + : MINIT(inserts, block_cache_inserts), + MINIT(lookups, block_cache_lookups), + MINIT(evictions, block_cache_evictions), + MINIT(cache_hits, block_cache_hits), + MINIT(cache_hits_caching, block_cache_hits_caching), + MINIT(cache_misses, block_cache_misses), + MINIT(cache_misses_caching, block_cache_misses_caching), + GINIT(cache_usage, block_cache_usage) { +} +#undef MINIT +#undef GINIT + +} // namespace kudu
