http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bit-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bit-util-test.cc 
b/be/src/kudu/util/bit-util-test.cc
new file mode 100644
index 0000000..0d8eab4
--- /dev/null
+++ b/be/src/kudu/util/bit-util-test.cc
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <boost/utility/binary.hpp>
+#include <gtest/gtest.h>
+#include "kudu/util/bit-util.h"
+
+namespace kudu {
+
+TEST(BitUtil, TrailingBits) {
+  EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 0), 0);
+  EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 1), 1);
+  EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 64),
+            BOOST_BINARY(1 1 1 1 1 1 1 1));
+  EXPECT_EQ(BitUtil::TrailingBits(BOOST_BINARY(1 1 1 1 1 1 1 1), 100),
+            BOOST_BINARY(1 1 1 1 1 1 1 1));
+  EXPECT_EQ(BitUtil::TrailingBits(0, 1), 0);
+  EXPECT_EQ(BitUtil::TrailingBits(0, 64), 0);
+  EXPECT_EQ(BitUtil::TrailingBits(1LL << 63, 0), 0);
+  EXPECT_EQ(BitUtil::TrailingBits(1LL << 63, 63), 0);
+  EXPECT_EQ(BitUtil::TrailingBits(1LL << 63, 64), 1LL << 63);
+
+}
+
+TEST(BitUtil, ShiftBits) {
+  EXPECT_EQ(BitUtil::ShiftLeftZeroOnOverflow(1ULL, 64), 0ULL);
+  EXPECT_EQ(BitUtil::ShiftLeftZeroOnOverflow(0xFFFFFFFFFFFFFFFFULL, 32), 
0xFFFFFFFF00000000ULL);
+  EXPECT_EQ(BitUtil::ShiftRightZeroOnOverflow(1ULL, 64), 0ULL);
+  EXPECT_EQ(BitUtil::ShiftRightZeroOnOverflow(0xFFFFFFFFFFFFFFFFULL, 32), 
0x00000000FFFFFFFFULL);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bit-util.h b/be/src/kudu/util/bit-util.h
new file mode 100644
index 0000000..5f36887
--- /dev/null
+++ b/be/src/kudu/util/bit-util.h
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef IMPALA_BIT_UTIL_H
+#define IMPALA_BIT_UTIL_H
+
+#include <stdint.h>
+#include "kudu/gutil/port.h"
+
+namespace kudu {
+
+// Utility class to do standard bit tricks
+// TODO: is this in boost or something else like that?
+class BitUtil {
+ public:
+  // Returns the ceil of value/divisor
+  static inline int Ceil(int value, int divisor) {
+    return value / divisor + (value % divisor != 0);
+  }
+
+  // Returns the 'num_bits' least-significant bits of 'v'.
+  static inline uint64_t TrailingBits(uint64_t v, int num_bits) {
+    if (PREDICT_FALSE(num_bits == 0)) return 0;
+    if (PREDICT_FALSE(num_bits >= 64)) return v;
+    int n = 64 - num_bits;
+    return (v << n) >> n;
+  }
+
+  static inline uint64_t ShiftLeftZeroOnOverflow(uint64_t v, int num_bits) {
+    if (PREDICT_FALSE(num_bits >= 64)) return 0;
+    return v << num_bits;
+  }
+
+  static inline uint64_t ShiftRightZeroOnOverflow(uint64_t v, int num_bits) {
+    if (PREDICT_FALSE(num_bits >= 64)) return 0;
+    return v >> num_bits;
+  }
+
+
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bitmap-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bitmap-test.cc b/be/src/kudu/util/bitmap-test.cc
new file mode 100644
index 0000000..a91e414
--- /dev/null
+++ b/be/src/kudu/util/bitmap-test.cc
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <vector>
+
+#include "kudu/gutil/strings/join.h"
+#include "kudu/util/bitmap.h"
+
+namespace kudu {
+
+static int ReadBackBitmap(uint8_t *bm, size_t bits,
+                           std::vector<size_t> *result) {
+  int iters = 0;
+  for (TrueBitIterator iter(bm, bits);
+       !iter.done();
+       ++iter) {
+    size_t val = *iter;
+    result->push_back(val);
+
+    iters++;
+  }
+  return iters;
+}
+
+TEST(TestBitMap, TestIteration) {
+  uint8_t bm[8];
+  memset(bm, 0, sizeof(bm));
+  BitmapSet(bm, 0);
+  BitmapSet(bm, 8);
+  BitmapSet(bm, 31);
+  BitmapSet(bm, 32);
+  BitmapSet(bm, 33);
+  BitmapSet(bm, 63);
+
+  EXPECT_EQ("   0: 10000000 10000000 00000000 00000001 11000000 00000000 
00000000 00000001 \n",
+            BitmapToString(bm, sizeof(bm) * 8));
+
+  std::vector<size_t> read_back;
+
+  int iters = ReadBackBitmap(bm, sizeof(bm)*8, &read_back);
+  ASSERT_EQ(6, iters);
+  ASSERT_EQ("0,8,31,32,33,63", JoinElements(read_back, ","));
+}
+
+
+TEST(TestBitMap, TestIteration2) {
+  uint8_t bm[1];
+  memset(bm, 0, sizeof(bm));
+  BitmapSet(bm, 1);
+
+  std::vector<size_t> read_back;
+
+  int iters = ReadBackBitmap(bm, 3, &read_back);
+  ASSERT_EQ(1, iters);
+  ASSERT_EQ("1", JoinElements(read_back, ","));
+}
+
+TEST(TestBitmap, TestSetAndTestBits) {
+  uint8_t bm[1];
+  memset(bm, 0, sizeof(bm));
+
+  size_t num_bits = sizeof(bm) * 8;
+  for (size_t i = 0; i < num_bits; i++) {
+    ASSERT_FALSE(BitmapTest(bm, i));
+
+    BitmapSet(bm, i);
+    ASSERT_TRUE(BitmapTest(bm, i));
+
+    BitmapClear(bm, i);
+    ASSERT_FALSE(BitmapTest(bm, i));
+
+    BitmapChange(bm, i, true);
+    ASSERT_TRUE(BitmapTest(bm, i));
+
+    BitmapChange(bm, i, false);
+    ASSERT_FALSE(BitmapTest(bm, i));
+  }
+
+  // Set the other bit: 01010101
+  for (size_t i = 0; i < num_bits; ++i) {
+    ASSERT_FALSE(BitmapTest(bm, i));
+    if (i & 1) BitmapSet(bm, i);
+  }
+
+  // Check and Clear the other bit: 0000000
+  for (size_t i = 0; i < num_bits; ++i) {
+    ASSERT_EQ(!!(i & 1), BitmapTest(bm, i));
+    if (i & 1) BitmapClear(bm, i);
+  }
+
+  // Check if bits are zero and change the other to one
+  for (size_t i = 0; i < num_bits; ++i) {
+    ASSERT_FALSE(BitmapTest(bm, i));
+    BitmapChange(bm, i, i & 1);
+  }
+
+  // Check the bits change them again
+  for (size_t i = 0; i < num_bits; ++i) {
+    ASSERT_EQ(!!(i & 1), BitmapTest(bm, i));
+    BitmapChange(bm, i, !(i & 1));
+  }
+
+  // Check the last setup
+  for (size_t i = 0; i < num_bits; ++i) {
+    ASSERT_EQ(!(i & 1), BitmapTest(bm, i));
+  }
+}
+
+TEST(TestBitMap, TestBulkSetAndTestBits) {
+  uint8_t bm[16];
+  size_t total_size = sizeof(bm) * 8;
+
+  // Test Bulk change bits and test bits
+  for (int i = 0; i < 4; ++i) {
+    bool value = i & 1;
+    size_t num_bits = total_size;
+    while (num_bits > 0) {
+      for (size_t offset = 0; offset < num_bits; ++offset) {
+        BitmapChangeBits(bm, 0, total_size, !value);
+        BitmapChangeBits(bm, offset, num_bits - offset, value);
+
+        ASSERT_EQ(value, BitMapIsAllSet(bm, offset, num_bits));
+        ASSERT_EQ(!value, BitmapIsAllZero(bm, offset, num_bits));
+
+        if (offset > 1) {
+          ASSERT_EQ(value, BitmapIsAllZero(bm, 0, offset - 1));
+          ASSERT_EQ(!value, BitMapIsAllSet(bm, 0, offset - 1));
+        }
+
+        if ((offset + num_bits) < total_size) {
+          ASSERT_EQ(value, BitmapIsAllZero(bm, num_bits, total_size));
+          ASSERT_EQ(!value, BitMapIsAllSet(bm, num_bits, total_size));
+        }
+      }
+      num_bits--;
+    }
+  }
+}
+
+TEST(TestBitMap, TestFindBit) {
+  uint8_t bm[16];
+
+  size_t num_bits = sizeof(bm) * 8;
+  BitmapChangeBits(bm, 0, num_bits, false);
+  while (num_bits > 0) {
+    for (size_t offset = 0; offset < num_bits; ++offset) {
+      size_t idx;
+      ASSERT_FALSE(BitmapFindFirstSet(bm, offset, num_bits, &idx));
+      ASSERT_TRUE(BitmapFindFirstZero(bm, offset, num_bits, &idx));
+      ASSERT_EQ(idx, offset);
+    }
+    num_bits--;
+  }
+
+  num_bits = sizeof(bm) * 8;
+  for (int i = 0; i < num_bits; ++i) {
+    BitmapChange(bm, i, i & 3);
+  }
+
+  while (num_bits--) {
+    for (size_t offset = 0; offset < num_bits; ++offset) {
+      size_t idx;
+
+      // Find a set bit
+      bool res = BitmapFindFirstSet(bm, offset, num_bits, &idx);
+      size_t expected_set_idx = (offset + !(offset & 3));
+      bool expect_set_found = (expected_set_idx < num_bits);
+      ASSERT_EQ(expect_set_found, res);
+      if (expect_set_found) ASSERT_EQ(expected_set_idx, idx);
+
+      // Find a zero bit
+      res = BitmapFindFirstZero(bm, offset, num_bits, &idx);
+      size_t expected_zero_idx = offset + ((offset & 3) ? (4 - (offset & 3)) : 
0);
+      bool expect_zero_found = (expected_zero_idx < num_bits);
+      ASSERT_EQ(expect_zero_found, res);
+      if (expect_zero_found) ASSERT_EQ(expected_zero_idx, idx);
+    }
+  }
+}
+
+TEST(TestBitMap, TestBitmapIteration) {
+  uint8_t bm[8];
+  memset(bm, 0, sizeof(bm));
+  BitmapSet(bm, 0);
+  BitmapSet(bm, 8);
+  BitmapSet(bm, 31);
+  BitmapSet(bm, 32);
+  BitmapSet(bm, 33);
+  BitmapSet(bm, 63);
+
+  BitmapIterator biter(bm, sizeof(bm) * 8);
+
+  size_t i = 0;
+  size_t size;
+  bool value = false;
+  bool expected_value = true;
+  size_t expected_sizes[] = {1, 7, 1, 22, 3, 29, 1, 0};
+  while ((size = biter.Next(&value)) > 0) {
+    ASSERT_LT(i, 8);
+    ASSERT_EQ(expected_value, value);
+    ASSERT_EQ(expected_sizes[i], size);
+    expected_value = !expected_value;
+    i++;
+  }
+  ASSERT_EQ(expected_sizes[i], size);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bitmap.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bitmap.cc b/be/src/kudu/util/bitmap.cc
new file mode 100644
index 0000000..e38f2e2
--- /dev/null
+++ b/be/src/kudu/util/bitmap.cc
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <glog/logging.h>
+#include <string>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/util/bitmap.h"
+
+namespace kudu {
+
+void BitmapChangeBits(uint8_t *bitmap, size_t offset, size_t num_bits, bool 
value) {
+  DCHECK_GT(num_bits, 0);
+
+  size_t start_byte = (offset >> 3);
+  size_t end_byte = (offset + num_bits - 1) >> 3;
+  int single_byte = (start_byte == end_byte);
+
+  // Change the last bits of the first byte
+  size_t left = offset & 0x7;
+  size_t right = (single_byte) ? (left + num_bits) : 8;
+  uint8_t mask = ((0xff << left) & (0xff >> (8 - right)));
+  if (value) {
+    bitmap[start_byte++] |= mask;
+  } else {
+    bitmap[start_byte++] &= ~mask;
+  }
+
+  // Nothing left... I'm done
+  if (single_byte) {
+    return;
+  }
+
+  // change the middle bits
+  if (end_byte > start_byte) {
+    const uint8_t pattern8[2] = { 0x00, 0xff };
+    memset(bitmap + start_byte, pattern8[value], end_byte - start_byte);
+  }
+
+  // change the first bits of the last byte
+  right = offset + num_bits - (end_byte << 3);
+  mask = (0xff >> (8 - right));
+  if (value) {
+    bitmap[end_byte] |= mask;
+  } else {
+    bitmap[end_byte] &= ~mask;
+  }
+}
+
+bool BitmapFindFirst(const uint8_t *bitmap, size_t offset, size_t bitmap_size,
+                     bool value, size_t *idx) {
+  const uint64_t pattern64[2] = { 0xffffffffffffffff, 0x0000000000000000 };
+  const uint8_t pattern8[2] = { 0xff, 0x00 };
+  size_t bit;
+
+  DCHECK_LE(offset, bitmap_size);
+
+  // Jump to the byte at specified offset
+  const uint8_t *p = bitmap + (offset >> 3);
+  size_t num_bits = bitmap_size - offset;
+
+  // Find a 'value' bit at the end of the first byte
+  if ((bit = offset & 0x7)) {
+    for (; bit < 8 && num_bits > 0; ++bit) {
+      if (BitmapTest(p, bit) == value) {
+        *idx = ((p - bitmap) << 3) + bit;
+        return true;
+      }
+
+      num_bits--;
+    }
+
+    p++;
+  }
+
+  // check 64bit at the time for a 'value' bit
+  const uint64_t *u64 = (const uint64_t *)p;
+  while (num_bits >= 64 && *u64 == pattern64[value]) {
+    num_bits -= 64;
+    u64++;
+  }
+
+  // check 8bit at the time for a 'value' bit
+  p = (const uint8_t *)u64;
+  while (num_bits >= 8 && *p == pattern8[value]) {
+    num_bits -= 8;
+    p++;
+  }
+
+  // Find a 'value' bit at the beginning of the last byte
+  for (bit = 0; num_bits > 0; ++bit) {
+    if (BitmapTest(p, bit) == value) {
+      *idx = ((p - bitmap) << 3) + bit;
+      return true;
+    }
+    num_bits--;
+  }
+
+  return false;
+}
+
+std::string BitmapToString(const uint8_t *bitmap, size_t num_bits) {
+  std::string s;
+  size_t index = 0;
+  while (index < num_bits) {
+    StringAppendF(&s, "%4zu: ", index);
+    for (int i = 0; i < 8 && index < num_bits; ++i) {
+      for (int j = 0; j < 8 && index < num_bits; ++j) {
+        StringAppendF(&s, "%d", BitmapTest(bitmap, index));
+        index++;
+      }
+      StringAppendF(&s, " ");
+    }
+    StringAppendF(&s, "\n");
+  }
+  return s;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bitmap.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bitmap.h b/be/src/kudu/util/bitmap.h
new file mode 100644
index 0000000..1689b50
--- /dev/null
+++ b/be/src/kudu/util/bitmap.h
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utility functions for dealing with a byte array as if it were a bitmap.
+#ifndef KUDU_UTIL_BITMAP_H
+#define KUDU_UTIL_BITMAP_H
+
+#include <string>
+#include "kudu/gutil/bits.h"
+
+namespace kudu {
+
+// Return the number of bytes necessary to store the given number of bits.
+inline size_t BitmapSize(size_t num_bits) {
+  return (num_bits + 7) / 8;
+}
+
+// Set the given bit.
+inline void BitmapSet(uint8_t *bitmap, size_t idx) {
+  bitmap[idx >> 3] |= 1 << (idx & 7);
+}
+
+// Switch the given bit to the specified value.
+inline void BitmapChange(uint8_t *bitmap, size_t idx, bool value) {
+  bitmap[idx >> 3] = (bitmap[idx >> 3] & ~(1 << (idx & 7))) | ((!!value) << 
(idx & 7));
+}
+
+// Clear the given bit.
+inline void BitmapClear(uint8_t *bitmap, size_t idx) {
+  bitmap[idx >> 3] &= ~(1 << (idx & 7));
+}
+
+// Test/get the given bit.
+inline bool BitmapTest(const uint8_t *bitmap, size_t idx) {
+  return bitmap[idx >> 3] & (1 << (idx & 7));
+}
+
+// Merge the two bitmaps using bitwise or. Both bitmaps should have at least
+// n_bits valid bits.
+inline void BitmapMergeOr(uint8_t *dst, const uint8_t *src, size_t n_bits) {
+  size_t n_bytes = BitmapSize(n_bits);
+  for (size_t i = 0; i < n_bytes; i++) {
+    *dst++ |= *src++;
+  }
+}
+
+// Set bits from offset to (offset + num_bits) to the specified value
+void BitmapChangeBits(uint8_t *bitmap, size_t offset, size_t num_bits, bool 
value);
+
+// Find the first bit of the specified value, starting from the specified 
offset.
+bool BitmapFindFirst(const uint8_t *bitmap, size_t offset, size_t bitmap_size,
+                     bool value, size_t *idx);
+
+// Find the first set bit in the bitmap, at the specified offset.
+inline bool BitmapFindFirstSet(const uint8_t *bitmap, size_t offset,
+                               size_t bitmap_size, size_t *idx) {
+  return BitmapFindFirst(bitmap, offset, bitmap_size, true, idx);
+}
+
+// Find the first zero bit in the bitmap, at the specified offset.
+inline bool BitmapFindFirstZero(const uint8_t *bitmap, size_t offset,
+                                size_t bitmap_size, size_t *idx) {
+  return BitmapFindFirst(bitmap, offset, bitmap_size, false, idx);
+}
+
+// Returns true if the bitmap contains only ones.
+inline bool BitMapIsAllSet(const uint8_t *bitmap, size_t offset, size_t 
bitmap_size) {
+  DCHECK_LT(offset, bitmap_size);
+  size_t idx;
+  return !BitmapFindFirstZero(bitmap, offset, bitmap_size, &idx);
+}
+
+// Returns true if the bitmap contains only zeros.
+inline bool BitmapIsAllZero(const uint8_t *bitmap, size_t offset, size_t 
bitmap_size) {
+  DCHECK_LT(offset, bitmap_size);
+  size_t idx;
+  return !BitmapFindFirstSet(bitmap, offset, bitmap_size, &idx);
+}
+
+std::string BitmapToString(const uint8_t *bitmap, size_t num_bits);
+
+// Iterator which yields ranges of set and unset bits.
+// Example usage:
+//   bool value;
+//   size_t size;
+//   BitmapIterator iter(bitmap, n_bits);
+//   while ((size = iter.Next(&value))) {
+//      printf("bitmap block len=%lu value=%d\n", size, value);
+//   }
+class BitmapIterator {
+ public:
+  BitmapIterator(const uint8_t *map, size_t num_bits)
+    : offset_(0), num_bits_(num_bits), map_(map)
+  {}
+
+  bool done() const {
+    return (num_bits_ - offset_) == 0;
+  }
+
+  void SeekTo(size_t bit) {
+    DCHECK_LE(bit, num_bits_);
+    offset_ = bit;
+  }
+
+  size_t Next(bool *value) {
+    size_t len = num_bits_ - offset_;
+    if (PREDICT_FALSE(len == 0))
+      return(0);
+
+    *value = BitmapTest(map_, offset_);
+
+    size_t index;
+    if (BitmapFindFirst(map_, offset_, num_bits_, !(*value), &index)) {
+      len = index - offset_;
+    } else {
+      index = num_bits_;
+    }
+
+    offset_ = index;
+    return len;
+  }
+
+ private:
+  size_t offset_;
+  size_t num_bits_;
+  const uint8_t *map_;
+};
+
+// Iterator which yields the set bits in a bitmap.
+// Example usage:
+//   for (TrueBitIterator iter(bitmap, n_bits);
+//        !iter.done();
+//        ++iter) {
+//     int next_onebit_position = *iter;
+//   }
+class TrueBitIterator {
+ public:
+  TrueBitIterator(const uint8_t *bitmap, size_t n_bits)
+    : bitmap_(bitmap),
+      cur_byte_(0),
+      cur_byte_idx_(0),
+      n_bits_(n_bits),
+      n_bytes_(BitmapSize(n_bits_)),
+      bit_idx_(0) {
+    if (n_bits_ == 0) {
+      cur_byte_idx_ = 1; // sets done
+    } else {
+      cur_byte_ = bitmap[0];
+      AdvanceToNextOneBit();
+    }
+  }
+
+  TrueBitIterator &operator ++() {
+    DCHECK(!done());
+    DCHECK(cur_byte_ & 1);
+    cur_byte_ &= (~1);
+    AdvanceToNextOneBit();
+    return *this;
+  }
+
+  bool done() const {
+    return cur_byte_idx_ >= n_bytes_;
+  }
+
+  size_t operator *() const {
+    DCHECK(!done());
+    return bit_idx_;
+  }
+
+ private:
+  void AdvanceToNextOneBit() {
+    while (cur_byte_ == 0) {
+      cur_byte_idx_++;
+      if (cur_byte_idx_ >= n_bytes_) return;
+      cur_byte_ = bitmap_[cur_byte_idx_];
+      bit_idx_ = cur_byte_idx_ * 8;
+    }
+    DVLOG(2) << "Found next nonzero byte at " << cur_byte_idx_
+             << " val=" << cur_byte_;
+
+    DCHECK_NE(cur_byte_, 0);
+    int set_bit = Bits::FindLSBSetNonZero(cur_byte_);
+    bit_idx_ += set_bit;
+    cur_byte_ >>= set_bit;
+  }
+
+  const uint8_t *bitmap_;
+  uint8_t cur_byte_;
+  uint8_t cur_byte_idx_;
+
+  const size_t n_bits_;
+  const size_t n_bytes_;
+  size_t bit_idx_;
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/blocking_queue-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/blocking_queue-test.cc 
b/be/src/kudu/util/blocking_queue-test.cc
new file mode 100644
index 0000000..00f558a
--- /dev/null
+++ b/be/src/kudu/util/blocking_queue-test.cc
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <map>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/blocking_queue.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using std::string;
+using std::thread;
+using std::vector;
+
+namespace kudu {
+
+BlockingQueue<int32_t> test1_queue(5);
+
+void InsertSomeThings() {
+  ASSERT_EQ(test1_queue.Put(1), QUEUE_SUCCESS);
+  ASSERT_EQ(test1_queue.Put(2), QUEUE_SUCCESS);
+  ASSERT_EQ(test1_queue.Put(3), QUEUE_SUCCESS);
+}
+
+TEST(BlockingQueueTest, Test1) {
+  thread inserter_thread(InsertSomeThings);
+  int32_t i;
+  ASSERT_TRUE(test1_queue.BlockingGet(&i));
+  ASSERT_EQ(1, i);
+  ASSERT_TRUE(test1_queue.BlockingGet(&i));
+  ASSERT_EQ(2, i);
+  ASSERT_TRUE(test1_queue.BlockingGet(&i));
+  ASSERT_EQ(3, i);
+  inserter_thread.join();
+}
+
+TEST(BlockingQueueTest, TestBlockingDrainTo) {
+  BlockingQueue<int32_t> test_queue(3);
+  ASSERT_EQ(test_queue.Put(1), QUEUE_SUCCESS);
+  ASSERT_EQ(test_queue.Put(2), QUEUE_SUCCESS);
+  ASSERT_EQ(test_queue.Put(3), QUEUE_SUCCESS);
+  vector<int32_t> out;
+  ASSERT_OK(test_queue.BlockingDrainTo(&out, MonoTime::Now() + 
MonoDelta::FromSeconds(30)));
+  ASSERT_EQ(1, out[0]);
+  ASSERT_EQ(2, out[1]);
+  ASSERT_EQ(3, out[2]);
+
+  // Set a deadline in the past and ensure we time out.
+  Status s = test_queue.BlockingDrainTo(&out, MonoTime::Now() - 
MonoDelta::FromSeconds(1));
+  ASSERT_TRUE(s.IsTimedOut());
+
+  // Ensure that if the queue is shut down, we get Aborted status.
+  test_queue.Shutdown();
+  s = test_queue.BlockingDrainTo(&out, MonoTime::Now() - 
MonoDelta::FromSeconds(1));
+  ASSERT_TRUE(s.IsAborted());
+}
+
+// Test that, when the queue is shut down with elements still pending,
+// Drain still returns OK until the elements are all gone.
+TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
+  // Put some elements into the queue and then shut it down.
+  BlockingQueue<int32_t> q(3);
+  ASSERT_EQ(q.Put(1), QUEUE_SUCCESS);
+  ASSERT_EQ(q.Put(2), QUEUE_SUCCESS);
+
+  q.Shutdown();
+
+  // Get() should still return an element.
+  int i;
+  ASSERT_TRUE(q.BlockingGet(&i));
+  ASSERT_EQ(1, i);
+
+  // Drain should still return OK, since it yielded elements.
+  vector<int32_t> out;
+  ASSERT_OK(q.BlockingDrainTo(&out));
+  ASSERT_EQ(2, out[0]);
+
+  // Now that it's empty, it should return Aborted.
+  Status s = q.BlockingDrainTo(&out);
+  ASSERT_TRUE(s.IsAborted()) << s.ToString();
+  ASSERT_FALSE(q.BlockingGet(&i));
+}
+
+TEST(BlockingQueueTest, TestTooManyInsertions) {
+  BlockingQueue<int32_t> test_queue(2);
+  ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
+  ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
+  ASSERT_EQ(test_queue.Put(123), QUEUE_FULL);
+}
+
+namespace {
+
+struct LengthLogicalSize {
+  static size_t logical_size(const string& s) {
+    return s.length();
+  }
+};
+
+} // anonymous namespace
+
+TEST(BlockingQueueTest, TestLogicalSize) {
+  BlockingQueue<string, LengthLogicalSize> test_queue(4);
+  ASSERT_EQ(test_queue.Put("a"), QUEUE_SUCCESS);
+  ASSERT_EQ(test_queue.Put("bcd"), QUEUE_SUCCESS);
+  ASSERT_EQ(test_queue.Put("e"), QUEUE_FULL);
+}
+
+TEST(BlockingQueueTest, TestNonPointerParamsMayBeNonEmptyOnDestruct) {
+  BlockingQueue<int32_t> test_queue(1);
+  ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
+  // No DCHECK failure on destruct.
+}
+
+#ifndef NDEBUG
+TEST(BlockingQueueDeathTest, TestPointerParamsMustBeEmptyOnDestruct) {
+  ::testing::FLAGS_gtest_death_test_style = "threadsafe";
+  ASSERT_DEATH({
+      BlockingQueue<int32_t*> test_queue(1);
+      int32_t element = 123;
+      ASSERT_EQ(test_queue.Put(&element), QUEUE_SUCCESS);
+      // Debug assertion triggered on queue destruction since type is a 
pointer.
+    },
+    "BlockingQueue holds bare pointers");
+}
+#endif // NDEBUG
+
+TEST(BlockingQueueTest, TestGetFromShutdownQueue) {
+  BlockingQueue<int64_t> test_queue(2);
+  ASSERT_EQ(test_queue.Put(123), QUEUE_SUCCESS);
+  test_queue.Shutdown();
+  ASSERT_EQ(test_queue.Put(456), QUEUE_SHUTDOWN);
+  int64_t i;
+  ASSERT_TRUE(test_queue.BlockingGet(&i));
+  ASSERT_EQ(123, i);
+  ASSERT_FALSE(test_queue.BlockingGet(&i));
+}
+
+TEST(BlockingQueueTest, TestGscopedPtrMethods) {
+  BlockingQueue<int*> test_queue(2);
+  gscoped_ptr<int> input_int(new int(123));
+  ASSERT_EQ(test_queue.Put(&input_int), QUEUE_SUCCESS);
+  gscoped_ptr<int> output_int;
+  ASSERT_TRUE(test_queue.BlockingGet(&output_int));
+  ASSERT_EQ(123, *output_int.get());
+  test_queue.Shutdown();
+}
+
+class MultiThreadTest {
+ public:
+  MultiThreadTest()
+   :  puts_(4),
+      blocking_puts_(4),
+      nthreads_(5),
+      queue_(nthreads_ * puts_),
+      num_inserters_(nthreads_),
+      sync_latch_(nthreads_) {
+  }
+
+  void InserterThread(int arg) {
+    for (int i = 0; i < puts_; i++) {
+      ASSERT_EQ(queue_.Put(arg), QUEUE_SUCCESS);
+    }
+    sync_latch_.CountDown();
+    sync_latch_.Wait();
+    for (int i = 0; i < blocking_puts_; i++) {
+      ASSERT_TRUE(queue_.BlockingPut(arg));
+    }
+    MutexLock guard(lock_);
+    if (--num_inserters_ == 0) {
+      queue_.Shutdown();
+    }
+  }
+
+  void RemoverThread() {
+    for (int i = 0; i < puts_ + blocking_puts_; i++) {
+      int32_t arg = 0;
+      bool got = queue_.BlockingGet(&arg);
+      if (!got) {
+        arg = -1;
+      }
+      MutexLock guard(lock_);
+      gotten_[arg] = gotten_[arg] + 1;
+    }
+  }
+
+  void Run() {
+    for (int i = 0; i < nthreads_; i++) {
+      threads_.emplace_back(&MultiThreadTest::InserterThread, this, i);
+      threads_.emplace_back(&MultiThreadTest::RemoverThread, this);
+    }
+    // We add an extra thread to ensure that there aren't enough elements in
+    // the queue to go around.  This way, we test removal after Shutdown.
+    threads_.emplace_back(&MultiThreadTest::RemoverThread, this);
+    for (auto& thread : threads_) {
+      thread.join();
+    }
+    // Let's check to make sure we got what we should have.
+    MutexLock guard(lock_);
+    for (int i = 0; i < nthreads_; i++) {
+      ASSERT_EQ(puts_ + blocking_puts_, gotten_[i]);
+    }
+    // And there were nthreads_ * (puts_ + blocking_puts_)
+    // elements removed, but only nthreads_ * puts_ +
+    // blocking_puts_ elements added.  So some removers hit the
+    // shutdown case.
+    ASSERT_EQ(puts_ + blocking_puts_, gotten_[-1]);
+  }
+
+  int puts_;
+  int blocking_puts_;
+  int nthreads_;
+  BlockingQueue<int32_t> queue_;
+  Mutex lock_;
+  std::map<int32_t, int> gotten_;
+  vector<thread> threads_;
+  int num_inserters_;
+  CountDownLatch sync_latch_;
+};
+
+TEST(BlockingQueueTest, TestMultipleThreads) {
+  MultiThreadTest test;
+  test.Run();
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/blocking_queue.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/blocking_queue.h 
b/be/src/kudu/util/blocking_queue.h
new file mode 100644
index 0000000..2907030
--- /dev/null
+++ b/be/src/kudu/util/blocking_queue.h
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_BLOCKING_QUEUE_H
+#define KUDU_UTIL_BLOCKING_QUEUE_H
+
+#include <list>
+#include <string>
+#include <type_traits>
+#include <unistd.h>
+#include <vector>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// Return values for BlockingQueue::Put()
+enum QueueStatus {
+  QUEUE_SUCCESS = 0,
+  QUEUE_SHUTDOWN = 1,
+  QUEUE_FULL = 2
+};
+
+// Default logical length implementation: always returns 1.
+struct DefaultLogicalSize {
+  template<typename T>
+  static size_t logical_size(const T& /* unused */) {
+    return 1;
+  }
+};
+
+template <typename T, class LOGICAL_SIZE = DefaultLogicalSize>
+class BlockingQueue {
+ public:
+  // If T is a pointer, this will be the base type.  If T is not a pointer, you
+  // can ignore this and the functions which make use of it.
+  // Template substitution failure is not an error.
+  typedef typename std::remove_pointer<T>::type T_VAL;
+
+  explicit BlockingQueue(size_t max_size)
+    : shutdown_(false),
+      size_(0),
+      max_size_(max_size),
+      not_empty_(&lock_),
+      not_full_(&lock_) {
+  }
+
+  // If the queue holds a bare pointer, it must be empty on destruction, since
+  // it may have ownership of the pointer.
+  ~BlockingQueue() {
+    DCHECK(list_.empty() || !std::is_pointer<T>::value)
+        << "BlockingQueue holds bare pointers at destruction time";
+  }
+
+  // Get an element from the queue.  Returns false if we were shut down prior 
to
+  // getting the element.
+  bool BlockingGet(T *out) {
+    MutexLock l(lock_);
+    while (true) {
+      if (!list_.empty()) {
+        *out = list_.front();
+        list_.pop_front();
+        decrement_size_unlocked(*out);
+        not_full_.Signal();
+        return true;
+      }
+      if (shutdown_) {
+        return false;
+      }
+      not_empty_.Wait();
+    }
+  }
+
+  // Get an element from the queue.  Returns false if the queue is empty and
+  // we were shut down prior to getting the element.
+  bool BlockingGet(gscoped_ptr<T_VAL> *out) {
+    T t = NULL;
+    bool got_element = BlockingGet(&t);
+    if (!got_element) {
+      return false;
+    }
+    out->reset(t);
+    return true;
+  }
+
+  // Get all elements from the queue and append them to a vector.
+  //
+  // If 'deadline' passes and no elements have been returned from the
+  // queue, returns Status::TimedOut(). If 'deadline' is uninitialized,
+  // no deadline is used.
+  //
+  // If the queue has been shut down, but there are still elements waiting,
+  // then it returns those elements as if the queue were not yet shut down.
+  //
+  // Returns:
+  // - OK if successful
+  // - TimedOut if the deadline passed
+  // - Aborted if the queue shut down
+  Status BlockingDrainTo(std::vector<T>* out, MonoTime deadline = MonoTime()) {
+    MutexLock l(lock_);
+    while (true) {
+      if (!list_.empty()) {
+        out->reserve(list_.size());
+        for (const T& elt : list_) {
+          out->push_back(elt);
+          decrement_size_unlocked(elt);
+        }
+        list_.clear();
+        not_full_.Signal();
+        return Status::OK();
+      }
+      if (PREDICT_FALSE(shutdown_)) {
+        return Status::Aborted("");
+      }
+      if (!deadline.Initialized()) {
+        not_empty_.Wait();
+      } else if (PREDICT_FALSE(!not_empty_.TimedWait(deadline - 
MonoTime::Now()))) {
+        return Status::TimedOut("");
+      }
+    }
+  }
+
+  // Attempts to put the given value in the queue.
+  // Returns:
+  //   QUEUE_SUCCESS: if successfully inserted
+  //   QUEUE_FULL: if the queue has reached max_size
+  //   QUEUE_SHUTDOWN: if someone has already called Shutdown()
+  QueueStatus Put(const T &val) {
+    MutexLock l(lock_);
+    if (size_ >= max_size_) {
+      return QUEUE_FULL;
+    }
+    if (shutdown_) {
+      return QUEUE_SHUTDOWN;
+    }
+    list_.push_back(val);
+    increment_size_unlocked(val);
+    l.Unlock();
+    not_empty_.Signal();
+    return QUEUE_SUCCESS;
+  }
+
+  // Returns the same as the other Put() overload above.
+  // If the element was inserted, the gscoped_ptr releases its contents.
+  QueueStatus Put(gscoped_ptr<T_VAL> *val) {
+    QueueStatus s = Put(val->get());
+    if (s == QUEUE_SUCCESS) {
+      ignore_result<>(val->release());
+    }
+    return s;
+  }
+
+  // Gets an element for the queue; if the queue is full, blocks until
+  // space becomes available. Returns false if we were shutdown prior
+  // to enqueueing the element.
+  bool BlockingPut(const T& val) {
+    MutexLock l(lock_);
+    while (true) {
+      if (shutdown_) {
+        return false;
+      }
+      if (size_ < max_size_) {
+        list_.push_back(val);
+        increment_size_unlocked(val);
+        l.Unlock();
+        not_empty_.Signal();
+        return true;
+      }
+      not_full_.Wait();
+    }
+  }
+
+  // Same as other BlockingPut() overload above. If the element was
+  // enqueued, gscoped_ptr releases its contents.
+  bool BlockingPut(gscoped_ptr<T_VAL>* val) {
+    bool ret = Put(val->get());
+    if (ret) {
+      ignore_result(val->release());
+    }
+    return ret;
+  }
+
+  // Shut down the queue.
+  // When a blocking queue is shut down, no more elements can be added to it,
+  // and Put() will return QUEUE_SHUTDOWN.
+  // Existing elements will drain out of it, and then BlockingGet will start
+  // returning false.
+  void Shutdown() {
+    MutexLock l(lock_);
+    shutdown_ = true;
+    not_full_.Broadcast();
+    not_empty_.Broadcast();
+  }
+
+  bool empty() const {
+    MutexLock l(lock_);
+    return list_.empty();
+  }
+
+  size_t max_size() const {
+    return max_size_;
+  }
+
+  std::string ToString() const {
+    std::string ret;
+
+    MutexLock l(lock_);
+    for (const T& t : list_) {
+      ret.append(t->ToString());
+      ret.append("\n");
+    }
+    return ret;
+  }
+
+ private:
+
+  // Increments queue size. Must be called when 'lock_' is held.
+  void increment_size_unlocked(const T& t) {
+    size_ += LOGICAL_SIZE::logical_size(t);
+  }
+
+  // Decrements queue size. Must be called when 'lock_' is held.
+  void decrement_size_unlocked(const T& t) {
+    size_ -= LOGICAL_SIZE::logical_size(t);
+  }
+
+  bool shutdown_;
+  size_t size_;
+  size_t max_size_;
+  mutable Mutex lock_;
+  ConditionVariable not_empty_;
+  ConditionVariable not_full_;
+  std::list<T> list_;
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bloom_filter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bloom_filter-test.cc 
b/be/src/kudu/util/bloom_filter-test.cc
new file mode 100644
index 0000000..3cee985
--- /dev/null
+++ b/be/src/kudu/util/bloom_filter-test.cc
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <stdlib.h>
+#include "kudu/util/bloom_filter.h"
+
+namespace kudu {
+
+static const int kRandomSeed = 0xdeadbeef;
+
+static void AddRandomKeys(int random_seed, int n_keys, BloomFilterBuilder *bf) 
{
+  srandom(random_seed);
+  for (int i = 0; i < n_keys; i++) {
+    uint64_t key = random();
+    Slice key_slice(reinterpret_cast<const uint8_t *>(&key), sizeof(key));
+    BloomKeyProbe probe(key_slice);
+    bf->AddKey(probe);
+  }
+}
+
+static void CheckRandomKeys(int random_seed, int n_keys, const BloomFilter 
&bf) {
+  srandom(random_seed);
+  for (int i = 0; i < n_keys; i++) {
+    uint64_t key = random();
+    Slice key_slice(reinterpret_cast<const uint8_t *>(&key), sizeof(key));
+    BloomKeyProbe probe(key_slice);
+    ASSERT_TRUE(bf.MayContainKey(probe));
+  }
+}
+
+TEST(TestBloomFilter, TestInsertAndProbe) {
+  int n_keys = 2000;
+  BloomFilterBuilder bfb(
+    BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01));
+
+  // Check that the desired false positive rate is achieved.
+  double expected_fp_rate = bfb.false_positive_rate();
+  ASSERT_NEAR(expected_fp_rate, 0.01, 0.002);
+
+  // 1% FP rate should need about 9 bits per key
+  ASSERT_EQ(9, bfb.n_bits() / n_keys);
+
+  // Enter n_keys random keys into the bloom filter
+  AddRandomKeys(kRandomSeed, n_keys, &bfb);
+
+  // Verify that the keys we inserted all return true when queried.
+  BloomFilter bf(bfb.slice(), bfb.n_hashes());
+  CheckRandomKeys(kRandomSeed, n_keys, bf);
+
+  // Query a bunch of other keys, and verify the false positive rate
+  // is within reasonable bounds.
+  uint32_t num_queries = 100000;
+  uint32_t num_positives = 0;
+  for (int i = 0; i < num_queries; i++) {
+    uint64_t key = random();
+    Slice key_slice(reinterpret_cast<const uint8_t *>(&key), sizeof(key));
+    BloomKeyProbe probe(key_slice);
+    if (bf.MayContainKey(probe)) {
+      num_positives++;
+    }
+  }
+
+  double fp_rate = static_cast<double>(num_positives) / 
static_cast<double>(num_queries);
+  LOG(INFO) << "FP rate: " << fp_rate << " (" << num_positives << "/" << 
num_queries << ")";
+  LOG(INFO) << "Expected FP rate: " << expected_fp_rate;
+
+  // Actual FP rate should be within 20% of the estimated FP rate
+  ASSERT_NEAR(fp_rate, expected_fp_rate, 0.20*expected_fp_rate);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bloom_filter.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bloom_filter.cc b/be/src/kudu/util/bloom_filter.cc
new file mode 100644
index 0000000..2b48b0d
--- /dev/null
+++ b/be/src/kudu/util/bloom_filter.cc
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <math.h>
+
+#include "kudu/util/bloom_filter.h"
+#include "kudu/util/bitmap.h"
+
+namespace kudu {
+
+static double kNaturalLog2 = 0.69314;
+
+static int ComputeOptimalHashCount(size_t n_bits, size_t elems) {
+  int n_hashes = n_bits * kNaturalLog2 / elems;
+  if (n_hashes < 1) n_hashes = 1;
+  return n_hashes;
+}
+
+BloomFilterSizing BloomFilterSizing::ByCountAndFPRate(
+  size_t expected_count, double fp_rate) {
+  CHECK_GT(fp_rate, 0);
+  CHECK_LT(fp_rate, 1);
+
+  double n_bits = -static_cast<double>(expected_count) * log(fp_rate)
+    / kNaturalLog2 / kNaturalLog2;
+  int n_bytes = static_cast<int>(ceil(n_bits / 8));
+  CHECK_GT(n_bytes, 0)
+    << "expected_count: " << expected_count
+    << " fp_rate: " << fp_rate;
+  return BloomFilterSizing(n_bytes, expected_count);
+}
+
+BloomFilterSizing BloomFilterSizing::BySizeAndFPRate(size_t n_bytes, double 
fp_rate) {
+  size_t n_bits = n_bytes * 8;
+  double expected_elems = -static_cast<double>(n_bits) * kNaturalLog2 * 
kNaturalLog2 /
+    log(fp_rate);
+  DCHECK_GT(expected_elems, 1);
+  return BloomFilterSizing(n_bytes, (size_t)ceil(expected_elems));
+}
+
+
+BloomFilterBuilder::BloomFilterBuilder(const BloomFilterSizing &sizing)
+  : n_bits_(sizing.n_bytes() * 8),
+    bitmap_(new uint8_t[sizing.n_bytes()]),
+    n_hashes_(ComputeOptimalHashCount(n_bits_, sizing.expected_count())),
+    expected_count_(sizing.expected_count()),
+    n_inserted_(0) {
+  Clear();
+}
+
+void BloomFilterBuilder::Clear() {
+  memset(&bitmap_[0], 0, n_bytes());
+  n_inserted_ = 0;
+}
+
+double BloomFilterBuilder::false_positive_rate() const {
+  CHECK_NE(expected_count_, 0)
+    << "expected_count_ not initialized: can't call this function on "
+    << "a BloomFilter initialized from external data";
+
+  return pow(1 - exp(-static_cast<double>(n_hashes_) * expected_count_ / 
n_bits_), n_hashes_);
+}
+
+BloomFilter::BloomFilter(const Slice &data, size_t n_hashes)
+  : n_bits_(data.size() * 8),
+    bitmap_(reinterpret_cast<const uint8_t *>(data.data())),
+    n_hashes_(n_hashes)
+{}
+
+
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/bloom_filter.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bloom_filter.h b/be/src/kudu/util/bloom_filter.h
new file mode 100644
index 0000000..419de3b
--- /dev/null
+++ b/be/src/kudu/util/bloom_filter.h
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_BLOOM_FILTER_H
+#define KUDU_UTIL_BLOOM_FILTER_H
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/hash/city.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/util/bitmap.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+// Probe calculated from a given key. This caches the calculated
+// hash values which are necessary for probing into a Bloom Filter,
+// so that when many bloom filters have to be consulted for a given
+// key, we only need to calculate the hashes once.
+//
+// This is implemented based on the idea of double-hashing from the following 
paper:
+//   "Less Hashing, Same Performance: Building a Better Bloom Filter"
+//   Kirsch and Mitzenmacher, ESA 2006
+//   https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
+//
+// Currently, the implementation uses the 64-bit City Hash.
+// TODO: an SSE CRC32 hash is probably ~20% faster. Come back to this
+// at some point.
+class BloomKeyProbe {
+ public:
+  // Default constructor - this is only used to instantiate an object
+  // and later reassign by assignment from another instance
+  BloomKeyProbe() {}
+
+  // Construct a probe from the given key.
+  //
+  // NOTE: proper operation requires that the referenced memory remain
+  // valid for the lifetime of this object.
+  explicit BloomKeyProbe(const Slice &key) : key_(key) {
+    uint64_t h = util_hash::CityHash64(
+      reinterpret_cast<const char *>(key.data()),
+      key.size());
+
+    // Use the top and bottom halves of the 64-bit hash
+    // as the two independent hash functions for mixing.
+    h_1_ = static_cast<uint32>(h);
+    h_2_ = static_cast<uint32>(h >> 32);
+  }
+
+  const Slice &key() const { return key_; }
+
+  // The initial hash value. See MixHash() for usage example.
+  uint32_t initial_hash() const {
+    return h_1_;
+  }
+
+  // Mix the given hash function with the second calculated hash
+  // value. A sequence of independent hashes can be calculated
+  // by repeatedly calling MixHash() on its previous result.
+  uint32_t MixHash(uint32_t h) const {
+    return h + h_2_;
+  }
+
+ private:
+  Slice key_;
+
+  // The two hashes.
+  uint32_t h_1_;
+  uint32_t h_2_;
+};
+
+// Sizing parameters for the constructor to BloomFilterBuilder.
+// This is simply to provide a nicer API than a bunch of overloaded
+// constructors.
+class BloomFilterSizing {
+ public:
+  // Size the bloom filter by a fixed size and false positive rate.
+  //
+  // Picks the number of entries to achieve the above.
+  static BloomFilterSizing BySizeAndFPRate(size_t n_bytes, double fp_rate);
+
+  // Size the bloom filer by an expected count and false positive rate.
+  //
+  // Picks the number of bytes to achieve the above.
+  static BloomFilterSizing ByCountAndFPRate(size_t expected_count, double 
fp_rate);
+
+  size_t n_bytes() const { return n_bytes_; }
+  size_t expected_count() const { return expected_count_; }
+
+ private:
+  BloomFilterSizing(size_t n_bytes, size_t expected_count) :
+    n_bytes_(n_bytes),
+    expected_count_(expected_count)
+  {}
+
+  size_t n_bytes_;
+  size_t expected_count_;
+};
+
+
+// Builder for a BloomFilter structure.
+class BloomFilterBuilder {
+ public:
+  // Create a bloom filter.
+  // See BloomFilterSizing static methods to specify this argument.
+  explicit BloomFilterBuilder(const BloomFilterSizing &sizing);
+
+  // Clear all entries, reset insertion count.
+  void Clear();
+
+  // Add the given key to the bloom filter.
+  void AddKey(const BloomKeyProbe &probe);
+
+  // Return an estimate of the false positive rate.
+  double false_positive_rate() const;
+
+  int n_bytes() const {
+    return n_bits_ / 8;
+  }
+
+  int n_bits() const {
+    return n_bits_;
+  }
+
+  // Return a slice view into this Bloom Filter, suitable for
+  // writing out to a file.
+  const Slice slice() const {
+    return Slice(&bitmap_[0], n_bytes());
+  }
+
+  // Return the number of hashes that are calculated for each entry
+  // in the bloom filter.
+  size_t n_hashes() const { return n_hashes_; }
+
+  size_t expected_count() const { return expected_count_; }
+
+  // Return the number of keys inserted.
+  size_t count() const { return n_inserted_; }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(BloomFilterBuilder);
+
+  size_t n_bits_;
+  gscoped_array<uint8_t> bitmap_;
+
+  // The number of hash functions to compute.
+  size_t n_hashes_;
+
+  // The expected number of elements, for which the bloom is optimized.
+  size_t expected_count_;
+
+  // The number of elements inserted so far since the last Reset.
+  size_t n_inserted_;
+};
+
+
+// Wrapper around a byte array for reading it as a bloom filter.
+class BloomFilter {
+ public:
+  BloomFilter() : bitmap_(nullptr) {}
+  BloomFilter(const Slice &data, size_t n_hashes);
+
+  // Return true if the filter may contain the given key.
+  bool MayContainKey(const BloomKeyProbe &probe) const;
+
+ private:
+  friend class BloomFilterBuilder;
+  static uint32_t PickBit(uint32_t hash, size_t n_bits);
+
+  size_t n_bits_;
+  const uint8_t *bitmap_;
+
+  size_t n_hashes_;
+};
+
+
+////////////////////////////////////////////////////////////
+// Inline implementations
+////////////////////////////////////////////////////////////
+
+inline uint32_t BloomFilter::PickBit(uint32_t hash, size_t n_bits) {
+  switch (n_bits) {
+    // Fast path for the default bloom filter block size. Bitwise math
+    // is much faster than division.
+    case 4096 * 8:
+      return hash & (n_bits - 1);
+
+    default:
+      return hash % n_bits;
+  }
+}
+
+inline void BloomFilterBuilder::AddKey(const BloomKeyProbe &probe) {
+  uint32_t h = probe.initial_hash();
+  for (size_t i = 0; i < n_hashes_; i++) {
+    uint32_t bitpos = BloomFilter::PickBit(h, n_bits_);
+    BitmapSet(&bitmap_[0], bitpos);
+    h = probe.MixHash(h);
+  }
+  n_inserted_++;
+}
+
+inline bool BloomFilter::MayContainKey(const BloomKeyProbe &probe) const {
+  uint32_t h = probe.initial_hash();
+
+  // Basic unrolling by 2s gives a small benefit here since the two bit 
positions
+  // can be calculated in parallel -- it's a 50% chance that the first will be
+  // set even if it's a bloom miss, in which case we can parallelize the load.
+  int rem_hashes = n_hashes_;
+  while (rem_hashes >= 2) {
+    uint32_t bitpos1 = PickBit(h, n_bits_);
+    h = probe.MixHash(h);
+    uint32_t bitpos2 = PickBit(h, n_bits_);
+    h = probe.MixHash(h);
+
+    if (!BitmapTest(&bitmap_[0], bitpos1) ||
+        !BitmapTest(&bitmap_[0], bitpos2)) {
+      return false;
+    }
+
+    rem_hashes -= 2;
+  }
+
+  while (rem_hashes) {
+    uint32_t bitpos = PickBit(h, n_bits_);
+    if (!BitmapTest(&bitmap_[0], bitpos)) {
+      return false;
+    }
+    h = probe.MixHash(h);
+    rem_hashes--;
+  }
+  return true;
+}
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/boost_mutex_utils.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/boost_mutex_utils.h 
b/be/src/kudu/util/boost_mutex_utils.h
new file mode 100644
index 0000000..6f6390b
--- /dev/null
+++ b/be/src/kudu/util/boost_mutex_utils.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_BOOST_MUTEX_UTILS_H
+#define KUDU_BOOST_MUTEX_UTILS_H
+
+
+// Similar to std::lock_guard except that it takes
+// a lock pointer, and checks against nullptr. If the
+// pointer is NULL, does nothing. Otherwise guards
+// with the lock.
+template<class LockType>
+class lock_guard_maybe {
+ public:
+  explicit lock_guard_maybe(LockType *l) :
+    lock_(l) {
+    if (l != nullptr) {
+      l->lock();
+    }
+  }
+
+  ~lock_guard_maybe() {
+    if (lock_ != nullptr) {
+      lock_->unlock();
+    }
+  }
+
+ private:
+  LockType *lock_;
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/cache-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cache-test.cc b/be/src/kudu/util/cache-test.cc
new file mode 100644
index 0000000..7319cf9
--- /dev/null
+++ b/be/src/kudu/util/cache-test.cc
@@ -0,0 +1,234 @@
+// Some portions Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <glog/logging.h>
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+#include <memory>
+
+#include <vector>
+#include "kudu/util/cache.h"
+#include "kudu/util/coding.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/test_util.h"
+
+#if defined(__linux__)
+DECLARE_string(nvm_cache_path);
+#endif // defined(__linux__)
+
+namespace kudu {
+
+// Conversions between numeric keys/values and the types expected by Cache.
+static std::string EncodeInt(int k) {
+  faststring result;
+  PutFixed32(&result, k);
+  return result.ToString();
+}
+static int DecodeInt(const Slice& k) {
+  assert(k.size() == 4);
+  return DecodeFixed32(k.data());
+}
+
+class CacheTest : public KuduTest,
+                  public ::testing::WithParamInterface<CacheType>,
+                  public Cache::EvictionCallback {
+ public:
+
+  // Implementation of the EvictionCallback interface
+  void EvictedEntry(Slice key, Slice val) override {
+    evicted_keys_.push_back(DecodeInt(key));
+    evicted_values_.push_back(DecodeInt(val));
+  }
+  std::vector<int> evicted_keys_;
+  std::vector<int> evicted_values_;
+  std::shared_ptr<MemTracker> mem_tracker_;
+  gscoped_ptr<Cache> cache_;
+  MetricRegistry metric_registry_;
+
+  static const int kCacheSize = 14*1024*1024;
+
+  virtual void SetUp() OVERRIDE {
+
+#if defined(__linux__)
+    if (google::GetCommandLineFlagInfoOrDie("nvm_cache_path").is_default) {
+      FLAGS_nvm_cache_path = GetTestPath("nvm-cache");
+      ASSERT_OK(Env::Default()->CreateDir(FLAGS_nvm_cache_path));
+    }
+#endif // defined(__linux__)
+
+    cache_.reset(NewLRUCache(GetParam(), kCacheSize, "cache_test"));
+
+    MemTracker::FindTracker("cache_test-sharded_lru_cache", &mem_tracker_);
+    // Since nvm cache does not have memtracker due to the use of
+    // tcmalloc for this we only check for it in the DRAM case.
+    if (GetParam() == DRAM_CACHE) {
+      ASSERT_TRUE(mem_tracker_.get());
+    }
+
+    scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(
+        &metric_registry_, "test");
+    cache_->SetMetrics(entity);
+  }
+
+  int Lookup(int key) {
+    Cache::Handle* handle = cache_->Lookup(EncodeInt(key), 
Cache::EXPECT_IN_CACHE);
+    const int r = (handle == nullptr) ? -1 : DecodeInt(cache_->Value(handle));
+    if (handle != nullptr) {
+      cache_->Release(handle);
+    }
+    return r;
+  }
+
+  void Insert(int key, int value, int charge = 1) {
+    string key_str = EncodeInt(key);
+    string val_str = EncodeInt(value);
+    Cache::PendingHandle* handle = CHECK_NOTNULL(cache_->Allocate(key_str, 
val_str.size(), charge));
+    memcpy(cache_->MutableValue(handle), val_str.data(), val_str.size());
+
+    cache_->Release(cache_->Insert(handle, this));
+  }
+
+  void Erase(int key) {
+    cache_->Erase(EncodeInt(key));
+  }
+};
+
+#if defined(__linux__)
+INSTANTIATE_TEST_CASE_P(CacheTypes, CacheTest, ::testing::Values(DRAM_CACHE, 
NVM_CACHE));
+#else
+INSTANTIATE_TEST_CASE_P(CacheTypes, CacheTest, ::testing::Values(DRAM_CACHE));
+#endif // defined(__linux__)
+
+TEST_P(CacheTest, TrackMemory) {
+  if (mem_tracker_) {
+    Insert(100, 100, 1);
+    ASSERT_EQ(1, mem_tracker_->consumption());
+    Erase(100);
+    ASSERT_EQ(0, mem_tracker_->consumption());
+    ASSERT_EQ(1, mem_tracker_->peak_consumption());
+  }
+}
+
+TEST_P(CacheTest, HitAndMiss) {
+  ASSERT_EQ(-1, Lookup(100));
+
+  Insert(100, 101);
+  ASSERT_EQ(101, Lookup(100));
+  ASSERT_EQ(-1,  Lookup(200));
+  ASSERT_EQ(-1,  Lookup(300));
+
+  Insert(200, 201);
+  ASSERT_EQ(101, Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(-1,  Lookup(300));
+
+  Insert(100, 102);
+  ASSERT_EQ(102, Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(-1,  Lookup(300));
+
+  ASSERT_EQ(1, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_keys_[0]);
+  ASSERT_EQ(101, evicted_values_[0]);
+}
+
+TEST_P(CacheTest, Erase) {
+  Erase(200);
+  ASSERT_EQ(0, evicted_keys_.size());
+
+  Insert(100, 101);
+  Insert(200, 201);
+  Erase(100);
+  ASSERT_EQ(-1,  Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(1, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_keys_[0]);
+  ASSERT_EQ(101, evicted_values_[0]);
+
+  Erase(100);
+  ASSERT_EQ(-1,  Lookup(100));
+  ASSERT_EQ(201, Lookup(200));
+  ASSERT_EQ(1, evicted_keys_.size());
+}
+
+TEST_P(CacheTest, EntriesArePinned) {
+  Insert(100, 101);
+  Cache::Handle* h1 = cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE);
+  ASSERT_EQ(101, DecodeInt(cache_->Value(h1)));
+
+  Insert(100, 102);
+  Cache::Handle* h2 = cache_->Lookup(EncodeInt(100), Cache::EXPECT_IN_CACHE);
+  ASSERT_EQ(102, DecodeInt(cache_->Value(h2)));
+  ASSERT_EQ(0, evicted_keys_.size());
+
+  cache_->Release(h1);
+  ASSERT_EQ(1, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_keys_[0]);
+  ASSERT_EQ(101, evicted_values_[0]);
+
+  Erase(100);
+  ASSERT_EQ(-1, Lookup(100));
+  ASSERT_EQ(1, evicted_keys_.size());
+
+  cache_->Release(h2);
+  ASSERT_EQ(2, evicted_keys_.size());
+  ASSERT_EQ(100, evicted_keys_[1]);
+  ASSERT_EQ(102, evicted_values_[1]);
+}
+
+TEST_P(CacheTest, EvictionPolicy) {
+  Insert(100, 101);
+  Insert(200, 201);
+
+  const int kNumElems = 1000;
+  const int kSizePerElem = kCacheSize / kNumElems;
+
+  // Loop adding and looking up new entries, but repeatedly accessing key 101. 
This
+  // frequently-used entry should not be evicted.
+  for (int i = 0; i < kNumElems + 1000; i++) {
+    Insert(1000+i, 2000+i, kSizePerElem);
+    ASSERT_EQ(2000+i, Lookup(1000+i));
+    ASSERT_EQ(101, Lookup(100));
+  }
+  ASSERT_EQ(101, Lookup(100));
+  // Since '200' wasn't accessed in the loop above, it should have
+  // been evicted.
+  ASSERT_EQ(-1, Lookup(200));
+}
+
+TEST_P(CacheTest, HeavyEntries) {
+  // Add a bunch of light and heavy entries and then count the combined
+  // size of items still in the cache, which must be approximately the
+  // same as the total capacity.
+  const int kLight = kCacheSize/1000;
+  const int kHeavy = kCacheSize/100;
+  int added = 0;
+  int index = 0;
+  while (added < 2*kCacheSize) {
+    const int weight = (index & 1) ? kLight : kHeavy;
+    Insert(index, 1000+index, weight);
+    added += weight;
+    index++;
+  }
+
+  int cached_weight = 0;
+  for (int i = 0; i < index; i++) {
+    const int weight = (i & 1 ? kLight : kHeavy);
+    int r = Lookup(i);
+    if (r >= 0) {
+      cached_weight += weight;
+      ASSERT_EQ(1000+i, r);
+    }
+  }
+  ASSERT_LE(cached_weight, kCacheSize + kCacheSize/10);
+}
+
+TEST_P(CacheTest, NewId) {
+  uint64_t a = cache_->NewId();
+  uint64_t b = cache_->NewId();
+  ASSERT_NE(a, b);
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/cache.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cache.cc b/be/src/kudu/util/cache.cc
new file mode 100644
index 0000000..4700fb3
--- /dev/null
+++ b/be/src/kudu/util/cache.cc
@@ -0,0 +1,512 @@
+// Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <cstdlib>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/atomic_refcount.h"
+#include "kudu/gutil/bits.h"
+#include "kudu/gutil/hash/city.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/util/alignment.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/cache_metrics.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/metrics.h"
+
+#if !defined(__APPLE__)
+#include "kudu/util/nvm_cache.h"
+#endif
+
+// Useful in tests that require accurate cache capacity accounting.
+DEFINE_bool(cache_force_single_shard, false,
+            "Override all cache implementations to use just one shard");
+TAG_FLAG(cache_force_single_shard, hidden);
+
+namespace kudu {
+
+class MetricEntity;
+
+Cache::~Cache() {
+}
+
+namespace {
+
+using std::shared_ptr;
+using std::vector;
+
+typedef simple_spinlock MutexType;
+
+// LRU cache implementation
+
+// An entry is a variable length heap-allocated structure.  Entries
+// are kept in a circular doubly linked list ordered by access time.
+struct LRUHandle {
+  Cache::EvictionCallback* eviction_callback;
+  LRUHandle* next_hash;
+  LRUHandle* next;
+  LRUHandle* prev;
+  size_t charge;      // TODO(opt): Only allow uint32_t?
+  uint32_t key_length;
+  uint32_t val_length;
+  Atomic32 refs;
+  uint32_t hash;      // Hash of key(); used for fast sharding and comparisons
+
+  // The storage for the key/value pair itself. The data is stored as:
+  //   [key bytes ...] [padding up to 8-byte boundary] [value bytes ...]
+  uint8_t kv_data[1];   // Beginning of key/value pair
+
+  Slice key() const {
+    return Slice(kv_data, key_length);
+  }
+
+  uint8_t* mutable_val_ptr() {
+    int val_offset = KUDU_ALIGN_UP(key_length, sizeof(void*));
+    return &kv_data[val_offset];
+  }
+
+  const uint8_t* val_ptr() const {
+    return const_cast<LRUHandle*>(this)->mutable_val_ptr();
+  }
+
+  Slice value() const {
+    return Slice(val_ptr(), val_length);
+  }
+};
+
+// We provide our own simple hash table since it removes a whole bunch
+// of porting hacks and is also faster than some of the built-in hash
+// table implementations in some of the compiler/runtime combinations
+// we have tested.  E.g., readrandom speeds up by ~5% over the g++
+// 4.4.3's builtin hashtable.
+class HandleTable {
+ public:
+  HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
+  ~HandleTable() { delete[] list_; }
+
+  LRUHandle* Lookup(const Slice& key, uint32_t hash) {
+    return *FindPointer(key, hash);
+  }
+
+  LRUHandle* Insert(LRUHandle* h) {
+    LRUHandle** ptr = FindPointer(h->key(), h->hash);
+    LRUHandle* old = *ptr;
+    h->next_hash = (old == nullptr ? nullptr : old->next_hash);
+    *ptr = h;
+    if (old == nullptr) {
+      ++elems_;
+      if (elems_ > length_) {
+        // Since each cache entry is fairly large, we aim for a small
+        // average linked list length (<= 1).
+        Resize();
+      }
+    }
+    return old;
+  }
+
+  LRUHandle* Remove(const Slice& key, uint32_t hash) {
+    LRUHandle** ptr = FindPointer(key, hash);
+    LRUHandle* result = *ptr;
+    if (result != nullptr) {
+      *ptr = result->next_hash;
+      --elems_;
+    }
+    return result;
+  }
+
+ private:
+  // The table consists of an array of buckets where each bucket is
+  // a linked list of cache entries that hash into the bucket.
+  uint32_t length_;
+  uint32_t elems_;
+  LRUHandle** list_;
+
+  // Return a pointer to slot that points to a cache entry that
+  // matches key/hash.  If there is no such cache entry, return a
+  // pointer to the trailing slot in the corresponding linked list.
+  LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
+    LRUHandle** ptr = &list_[hash & (length_ - 1)];
+    while (*ptr != nullptr &&
+           ((*ptr)->hash != hash || key != (*ptr)->key())) {
+      ptr = &(*ptr)->next_hash;
+    }
+    return ptr;
+  }
+
+  void Resize() {
+    uint32_t new_length = 16;
+    while (new_length < elems_ * 1.5) {
+      new_length *= 2;
+    }
+    auto new_list = new LRUHandle*[new_length];
+    memset(new_list, 0, sizeof(new_list[0]) * new_length);
+    uint32_t count = 0;
+    for (uint32_t i = 0; i < length_; i++) {
+      LRUHandle* h = list_[i];
+      while (h != nullptr) {
+        LRUHandle* next = h->next_hash;
+        uint32_t hash = h->hash;
+        LRUHandle** ptr = &new_list[hash & (new_length - 1)];
+        h->next_hash = *ptr;
+        *ptr = h;
+        h = next;
+        count++;
+      }
+    }
+    DCHECK_EQ(elems_, count);
+    delete[] list_;
+    list_ = new_list;
+    length_ = new_length;
+  }
+};
+
+// A single shard of sharded cache.
+class LRUCache {
+ public:
+  explicit LRUCache(MemTracker* tracker);
+  ~LRUCache();
+
+  // Separate from constructor so caller can easily make an array of LRUCache
+  void SetCapacity(size_t capacity) { capacity_ = capacity; }
+
+  void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; }
+
+  Cache::Handle* Insert(LRUHandle* handle, Cache::EvictionCallback* 
eviction_callback);
+  // Like Cache::Lookup, but with an extra "hash" parameter.
+  Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
+  void Release(Cache::Handle* handle);
+  void Erase(const Slice& key, uint32_t hash);
+
+ private:
+  void LRU_Remove(LRUHandle* e);
+  void LRU_Append(LRUHandle* e);
+  // Just reduce the reference count by 1.
+  // Return true if last reference
+  bool Unref(LRUHandle* e);
+  // Call the user's eviction callback, if it exists, and free the entry.
+  void FreeEntry(LRUHandle* e);
+
+  // Initialized before use.
+  size_t capacity_;
+
+  // mutex_ protects the following state.
+  MutexType mutex_;
+  size_t usage_;
+
+  // Dummy head of LRU list.
+  // lru.prev is newest entry, lru.next is oldest entry.
+  LRUHandle lru_;
+
+  HandleTable table_;
+
+  MemTracker* mem_tracker_;
+
+  CacheMetrics* metrics_;
+};
+
+LRUCache::LRUCache(MemTracker* tracker)
+ : usage_(0),
+   mem_tracker_(tracker),
+   metrics_(nullptr) {
+  // Make empty circular linked list
+  lru_.next = &lru_;
+  lru_.prev = &lru_;
+}
+
+LRUCache::~LRUCache() {
+  for (LRUHandle* e = lru_.next; e != &lru_; ) {
+    LRUHandle* next = e->next;
+    DCHECK_EQ(e->refs, 1);  // Error if caller has an unreleased handle
+    if (Unref(e)) {
+      FreeEntry(e);
+    }
+    e = next;
+  }
+}
+
+bool LRUCache::Unref(LRUHandle* e) {
+  DCHECK_GT(ANNOTATE_UNPROTECTED_READ(e->refs), 0);
+  return !base::RefCountDec(&e->refs);
+}
+
+void LRUCache::FreeEntry(LRUHandle* e) {
+  DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(e->refs), 0);
+  if (e->eviction_callback) {
+    e->eviction_callback->EvictedEntry(e->key(), e->value());
+  }
+  mem_tracker_->Release(e->charge);
+  if (PREDICT_TRUE(metrics_)) {
+    metrics_->cache_usage->DecrementBy(e->charge);
+    metrics_->evictions->Increment();
+  }
+  delete [] e;
+}
+
+void LRUCache::LRU_Remove(LRUHandle* e) {
+  e->next->prev = e->prev;
+  e->prev->next = e->next;
+  usage_ -= e->charge;
+}
+
+void LRUCache::LRU_Append(LRUHandle* e) {
+  // Make "e" newest entry by inserting just before lru_
+  e->next = &lru_;
+  e->prev = lru_.prev;
+  e->prev->next = e;
+  e->next->prev = e;
+  usage_ += e->charge;
+}
+
+Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash, bool caching) 
{
+  LRUHandle* e;
+  {
+    std::lock_guard<MutexType> l(mutex_);
+    e = table_.Lookup(key, hash);
+    if (e != nullptr) {
+      base::RefCountInc(&e->refs);
+      LRU_Remove(e);
+      LRU_Append(e);
+    }
+  }
+
+  // Do the metrics outside of the lock.
+  if (metrics_) {
+    metrics_->lookups->Increment();
+    bool was_hit = (e != nullptr);
+    if (was_hit) {
+      if (caching) {
+        metrics_->cache_hits_caching->Increment();
+      } else {
+        metrics_->cache_hits->Increment();
+      }
+    } else {
+      if (caching) {
+        metrics_->cache_misses_caching->Increment();
+      } else {
+        metrics_->cache_misses->Increment();
+      }
+    }
+  }
+
+  return reinterpret_cast<Cache::Handle*>(e);
+}
+
+void LRUCache::Release(Cache::Handle* handle) {
+  LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
+  bool last_reference = Unref(e);
+  if (last_reference) {
+    FreeEntry(e);
+  }
+}
+
+Cache::Handle* LRUCache::Insert(LRUHandle* e, Cache::EvictionCallback 
*eviction_callback) {
+
+  // Set the remaining LRUHandle members which were not already allocated 
during
+  // Allocate().
+  e->eviction_callback = eviction_callback;
+  e->refs = 2;  // One from LRUCache, one for the returned handle
+  mem_tracker_->Consume(e->charge);
+  if (PREDICT_TRUE(metrics_)) {
+    metrics_->cache_usage->IncrementBy(e->charge);
+    metrics_->inserts->Increment();
+  }
+
+  LRUHandle* to_remove_head = nullptr;
+  {
+    std::lock_guard<MutexType> l(mutex_);
+
+    LRU_Append(e);
+
+    LRUHandle* old = table_.Insert(e);
+    if (old != nullptr) {
+      LRU_Remove(old);
+      if (Unref(old)) {
+        old->next = to_remove_head;
+        to_remove_head = old;
+      }
+    }
+
+    while (usage_ > capacity_ && lru_.next != &lru_) {
+      LRUHandle* old = lru_.next;
+      LRU_Remove(old);
+      table_.Remove(old->key(), old->hash);
+      if (Unref(old)) {
+        old->next = to_remove_head;
+        to_remove_head = old;
+      }
+    }
+  }
+
+  // we free the entries here outside of mutex for
+  // performance reasons
+  while (to_remove_head != nullptr) {
+    LRUHandle* next = to_remove_head->next;
+    FreeEntry(to_remove_head);
+    to_remove_head = next;
+  }
+
+  return reinterpret_cast<Cache::Handle*>(e);
+}
+
+void LRUCache::Erase(const Slice& key, uint32_t hash) {
+  LRUHandle* e;
+  bool last_reference = false;
+  {
+    std::lock_guard<MutexType> l(mutex_);
+    e = table_.Remove(key, hash);
+    if (e != nullptr) {
+      LRU_Remove(e);
+      last_reference = Unref(e);
+    }
+  }
+  // mutex not held here
+  // last_reference will only be true if e != NULL
+  if (last_reference) {
+    FreeEntry(e);
+  }
+}
+
+// Determine the number of bits of the hash that should be used to determine
+// the cache shard. This, in turn, determines the number of shards.
+int DetermineShardBits() {
+  int bits = PREDICT_FALSE(FLAGS_cache_force_single_shard) ?
+      0 : Bits::Log2Ceiling(base::NumCPUs());
+  VLOG(1) << "Will use " << (1 << bits) << " shards for LRU cache.";
+  return bits;
+}
+
+class ShardedLRUCache : public Cache {
+ private:
+  shared_ptr<MemTracker> mem_tracker_;
+  gscoped_ptr<CacheMetrics> metrics_;
+  vector<LRUCache*> shards_;
+  MutexType id_mutex_;
+  uint64_t last_id_;
+
+  // Number of bits of hash used to determine the shard.
+  const int shard_bits_;
+
+  static inline uint32_t HashSlice(const Slice& s) {
+    return util_hash::CityHash64(
+      reinterpret_cast<const char *>(s.data()), s.size());
+  }
+
+  uint32_t Shard(uint32_t hash) {
+    // Widen to uint64 before shifting, or else on a single CPU,
+    // we would try to shift a uint32_t by 32 bits, which is undefined.
+    return static_cast<uint64_t>(hash) >> (32 - shard_bits_);
+  }
+
+ public:
+  explicit ShardedLRUCache(size_t capacity, const string& id)
+      : last_id_(0),
+        shard_bits_(DetermineShardBits()) {
+    // A cache is often a singleton, so:
+    // 1. We reuse its MemTracker if one already exists, and
+    // 2. It is directly parented to the root MemTracker.
+    mem_tracker_ = MemTracker::FindOrCreateGlobalTracker(
+        -1, strings::Substitute("$0-sharded_lru_cache", id));
+
+    int num_shards = 1 << shard_bits_;
+    const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
+    for (int s = 0; s < num_shards; s++) {
+      gscoped_ptr<LRUCache> shard(new LRUCache(mem_tracker_.get()));
+      shard->SetCapacity(per_shard);
+      shards_.push_back(shard.release());
+    }
+  }
+
+  virtual ~ShardedLRUCache() {
+    STLDeleteElements(&shards_);
+  }
+
+  virtual Handle* Insert(PendingHandle* handle,
+                         Cache::EvictionCallback* eviction_callback) OVERRIDE {
+    LRUHandle* h = reinterpret_cast<LRUHandle*>(DCHECK_NOTNULL(handle));
+    return shards_[Shard(h->hash)]->Insert(h, eviction_callback);
+  }
+  virtual Handle* Lookup(const Slice& key, CacheBehavior caching) OVERRIDE {
+    const uint32_t hash = HashSlice(key);
+    return shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE);
+  }
+  virtual void Release(Handle* handle) OVERRIDE {
+    LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
+    shards_[Shard(h->hash)]->Release(handle);
+  }
+  virtual void Erase(const Slice& key) OVERRIDE {
+    const uint32_t hash = HashSlice(key);
+    shards_[Shard(hash)]->Erase(key, hash);
+  }
+  virtual Slice Value(Handle* handle) OVERRIDE {
+    return reinterpret_cast<LRUHandle*>(handle)->value();
+  }
+  virtual uint64_t NewId() OVERRIDE {
+    std::lock_guard<MutexType> l(id_mutex_);
+    return ++(last_id_);
+  }
+
+  virtual void SetMetrics(const scoped_refptr<MetricEntity>& entity) OVERRIDE {
+    metrics_.reset(new CacheMetrics(entity));
+    for (LRUCache* cache : shards_) {
+      cache->SetMetrics(metrics_.get());
+    }
+  }
+
+  virtual PendingHandle* Allocate(Slice key, int val_len, int charge) OVERRIDE 
{
+    int key_len = key.size();
+    DCHECK_GE(key_len, 0);
+    DCHECK_GE(val_len, 0);
+    int key_len_padded = KUDU_ALIGN_UP(key_len, sizeof(void*));
+    uint8_t* buf = new uint8_t[sizeof(LRUHandle)
+                               + key_len_padded + val_len // the kv_data VLA 
data
+                               - 1 // (the VLA has a 1-byte placeholder)
+                               ];
+    LRUHandle* handle = reinterpret_cast<LRUHandle*>(buf);
+    handle->key_length = key_len;
+    handle->val_length = val_len;
+    handle->charge = charge;
+    handle->hash = HashSlice(key);
+    memcpy(handle->kv_data, key.data(), key_len);
+
+    return reinterpret_cast<PendingHandle*>(handle);
+  }
+
+  virtual void Free(PendingHandle* h) OVERRIDE {
+    uint8_t* data = reinterpret_cast<uint8_t*>(h);
+    delete [] data;
+  }
+
+  virtual uint8_t* MutableValue(PendingHandle* h) OVERRIDE {
+    return reinterpret_cast<LRUHandle*>(h)->mutable_val_ptr();
+  }
+
+};
+
+}  // end anonymous namespace
+
+Cache* NewLRUCache(CacheType type, size_t capacity, const string& id) {
+  switch (type) {
+    case DRAM_CACHE:
+      return new ShardedLRUCache(capacity, id);
+#if !defined(__APPLE__)
+    case NVM_CACHE:
+      return NewLRUNvmCache(capacity, id);
+#endif
+    default:
+      LOG(FATAL) << "Unsupported LRU cache type: " << type;
+  }
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/cache.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cache.h b/be/src/kudu/util/cache.h
new file mode 100644
index 0000000..af0a9ae
--- /dev/null
+++ b/be/src/kudu/util/cache.h
@@ -0,0 +1,207 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// A Cache is an interface that maps keys to values.  It has internal
+// synchronization and may be safely accessed concurrently from
+// multiple threads.  It may automatically evict entries to make room
+// for new entries.  Values have a specified charge against the cache
+// capacity.  For example, a cache where the values are variable
+// length strings, may use the length of the string as the charge for
+// the string.
+//
+// This is taken from LevelDB and evolved to fit the kudu codebase.
+//
+// TODO: this is pretty lock-heavy. Would be good to sub out something
+// a little more concurrent.
+
+#ifndef KUDU_UTIL_CACHE_H_
+#define KUDU_UTIL_CACHE_H_
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+class Cache;
+struct CacheMetrics;
+class MetricEntity;
+
+enum CacheType {
+  DRAM_CACHE,
+  NVM_CACHE
+};
+
+// Create a new cache with a fixed size capacity.  This implementation
+// of Cache uses a least-recently-used eviction policy.
+Cache* NewLRUCache(CacheType type, size_t capacity, const std::string& id);
+
+class Cache {
+ public:
+  // Callback interface which is called when an entry is evicted from the
+  // cache.
+  class EvictionCallback {
+   public:
+    virtual void EvictedEntry(Slice key, Slice value) = 0;
+    virtual ~EvictionCallback() {}
+  };
+
+  Cache() { }
+
+  // Destroys all existing entries by calling the "deleter"
+  // function that was passed to the constructor.
+  virtual ~Cache();
+
+  // Opaque handle to an entry stored in the cache.
+  struct Handle { };
+
+  // Custom handle "deleter", primarily intended for use with std::unique_ptr.
+  //
+  // Sample usage:
+  //
+  //   Cache* cache = NewLRUCache(...);
+  //   ...
+  //   {
+  //     unique_ptr<Cache::Handle, Cache::HandleDeleter> h(
+  //       cache->Lookup(...), Cache::HandleDeleter(cache));
+  //     ...
+  //   } // 'h' is automatically released here
+  //
+  // Or:
+  //
+  //   Cache* cache = NewLRUCache(...);
+  //   ...
+  //   {
+  //     Cache::UniqueHandle h(cache->Lookup(...), 
Cache::HandleDeleter(cache));
+  //     ...
+  //   } // 'h' is automatically released here
+  //
+  class HandleDeleter {
+   public:
+    explicit HandleDeleter(Cache* c)
+        : c_(c) {
+    }
+
+    void operator()(Cache::Handle* h) const {
+      c_->Release(h);
+    }
+
+   private:
+    Cache* c_;
+  };
+  typedef std::unique_ptr<Handle, HandleDeleter> UniqueHandle;
+
+  // Passing EXPECT_IN_CACHE will increment the hit/miss metrics that track 
the number of times
+  // blocks were requested that the users were hoping to get the block from 
the cache, along with
+  // with the basic metrics.
+  // Passing NO_EXPECT_IN_CACHE will only increment the basic metrics.
+  // This helps in determining if we are effectively caching the blocks that 
matter the most.
+  enum CacheBehavior {
+    EXPECT_IN_CACHE,
+    NO_EXPECT_IN_CACHE
+  };
+
+  // If the cache has no mapping for "key", returns NULL.
+  //
+  // Else return a handle that corresponds to the mapping.  The caller
+  // must call this->Release(handle) when the returned mapping is no
+  // longer needed.
+  virtual Handle* Lookup(const Slice& key, CacheBehavior caching) = 0;
+
+  // Release a mapping returned by a previous Lookup().
+  // REQUIRES: handle must not have been released yet.
+  // REQUIRES: handle must have been returned by a method on *this.
+  virtual void Release(Handle* handle) = 0;
+
+  // Return the value encapsulated in a handle returned by a
+  // successful Lookup().
+  // REQUIRES: handle must not have been released yet.
+  // REQUIRES: handle must have been returned by a method on *this.
+  virtual Slice Value(Handle* handle) = 0;
+
+  // If the cache contains entry for key, erase it.  Note that the
+  // underlying entry will be kept around until all existing handles
+  // to it have been released.
+  virtual void Erase(const Slice& key) = 0;
+
+  // Return a new numeric id.  May be used by multiple clients who are
+  // sharing the same cache to partition the key space.  Typically the
+  // client will allocate a new id at startup and prepend the id to
+  // its cache keys.
+  virtual uint64_t NewId() = 0;
+
+  // Pass a metric entity in order to start recoding metrics.
+  virtual void SetMetrics(const scoped_refptr<MetricEntity>& metric_entity) = 
0;
+
+  // ------------------------------------------------------------
+  // Insertion path
+  // ------------------------------------------------------------
+  //
+  // Because some cache implementations (eg NVM) manage their own memory, and 
because we'd
+  // like to read blocks directly into cache-managed memory rather than 
causing an extra
+  // memcpy, the insertion of a new element into the cache requires two 
phases. First, a
+  // PendingHandle is allocated with space for the value, and then it is later 
inserted.
+  //
+  // For example:
+  //
+  //   PendingHandle* ph = cache_->Allocate("my entry", value_size, charge);
+  //   if (!ReadDataFromDisk(cache_->MutableValue(ph)).ok()) {
+  //     cache_->Free(ph);
+  //     ... error handling ...
+  //     return;
+  //   }
+  //   Handle* h = cache_->Insert(ph, my_eviction_callback);
+  //   ...
+  //   cache_->Release(h);
+
+  // Opaque handle to an entry which is being prepared to be added to
+  // the cache.
+  struct PendingHandle { };
+
+  // Allocate space for a new entry to be inserted into the cache.
+  //
+  // The provided 'key' is copied into the resulting handle object.
+  // The allocated handle has enough space such that the value can
+  // be written into cache_->MutableValue(handle).
+  //
+  // Note that this does not mutate the cache itself: lookups will
+  // not be able to find the provided key until it is inserted.
+  //
+  // It is possible that this will return NULL if the cache is above its 
capacity
+  // and eviction fails to free up enough space for the requested allocation.
+  //
+  // NOTE: the returned memory is not automatically freed by the cache: the
+  // caller must either free it using Free(), or insert it using Insert().
+  virtual PendingHandle* Allocate(Slice key, int val_len, int charge) = 0;
+
+  virtual uint8_t* MutableValue(PendingHandle* handle) = 0;
+
+  // Commit a prepared entry into the cache.
+  //
+  // Returns a handle that corresponds to the mapping.  The caller
+  // must call this->Release(handle) when the returned mapping is no
+  // longer needed. This method always succeeds and returns a non-null
+  // entry, since the space was reserved above.
+  //
+  // The 'pending' entry passed here should have been allocated using
+  // Cache::Allocate() above.
+  //
+  // If 'eviction_callback' is non-NULL, then it will be called when the
+  // entry is later evicted or when the cache shuts down.
+  virtual Handle* Insert(PendingHandle* pending, EvictionCallback* 
eviction_callback) = 0;
+
+  // Free 'ptr', which must have been previously allocated using 'Allocate'.
+  virtual void Free(PendingHandle* ptr) = 0;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Cache);
+};
+
+}  // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/cache_metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cache_metrics.cc 
b/be/src/kudu/util/cache_metrics.cc
new file mode 100644
index 0000000..ac2fadf
--- /dev/null
+++ b/be/src/kudu/util/cache_metrics.cc
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/cache_metrics.h"
+
+#include "kudu/util/metrics.h"
+
+METRIC_DEFINE_counter(server, block_cache_inserts,
+                      "Block Cache Inserts", kudu::MetricUnit::kBlocks,
+                      "Number of blocks inserted in the cache");
+METRIC_DEFINE_counter(server, block_cache_lookups,
+                      "Block Cache Lookups", kudu::MetricUnit::kBlocks,
+                      "Number of blocks looked up from the cache");
+METRIC_DEFINE_counter(server, block_cache_evictions,
+                      "Block Cache Evictions", kudu::MetricUnit::kBlocks,
+                      "Number of blocks evicted from the cache");
+METRIC_DEFINE_counter(server, block_cache_misses,
+                      "Block Cache Misses", kudu::MetricUnit::kBlocks,
+                      "Number of lookups that didn't yield a block");
+METRIC_DEFINE_counter(server, block_cache_misses_caching,
+                      "Block Cache Misses (Caching)", 
kudu::MetricUnit::kBlocks,
+                      "Number of lookups that were expecting a block that 
didn't yield one."
+                      "Use this number instead of cache_misses when trying to 
determine how "
+                      "efficient the cache is");
+METRIC_DEFINE_counter(server, block_cache_hits,
+                      "Block Cache Hits", kudu::MetricUnit::kBlocks,
+                      "Number of lookups that found a block");
+METRIC_DEFINE_counter(server, block_cache_hits_caching,
+                      "Block Cache Hits (Caching)", kudu::MetricUnit::kBlocks,
+                      "Number of lookups that were expecting a block that 
found one."
+                      "Use this number instead of cache_hits when trying to 
determine how "
+                      "efficient the cache is");
+
+METRIC_DEFINE_gauge_uint64(server, block_cache_usage, "Block Cache Memory 
Usage",
+                           kudu::MetricUnit::kBytes,
+                           "Memory consumed by the block cache");
+
+namespace kudu {
+
+#define MINIT(member, x) member(METRIC_##x.Instantiate(entity))
+#define GINIT(member, x) member(METRIC_##x.Instantiate(entity, 0))
+CacheMetrics::CacheMetrics(const scoped_refptr<MetricEntity>& entity)
+  : MINIT(inserts, block_cache_inserts),
+    MINIT(lookups, block_cache_lookups),
+    MINIT(evictions, block_cache_evictions),
+    MINIT(cache_hits, block_cache_hits),
+    MINIT(cache_hits_caching, block_cache_hits_caching),
+    MINIT(cache_misses, block_cache_misses),
+    MINIT(cache_misses_caching, block_cache_misses_caching),
+    GINIT(cache_usage, block_cache_usage) {
+}
+#undef MINIT
+#undef GINIT
+
+} // namespace kudu


Reply via email to