http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/cache_metrics.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cache_metrics.h b/be/src/kudu/util/cache_metrics.h
new file mode 100644
index 0000000..47f759f
--- /dev/null
+++ b/be/src/kudu/util/cache_metrics.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_CACHE_METRICS_H
+#define KUDU_UTIL_CACHE_METRICS_H
+
+#include <stdint.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+
+namespace kudu {
+
+template<class T>
+class AtomicGauge;
+class Counter;
+class MetricEntity;
+
+struct CacheMetrics {
+  explicit CacheMetrics(const scoped_refptr<MetricEntity>& metric_entity);
+
+  scoped_refptr<Counter> inserts;
+  scoped_refptr<Counter> lookups;
+  scoped_refptr<Counter> evictions;
+  scoped_refptr<Counter> cache_hits;
+  scoped_refptr<Counter> cache_hits_caching;
+  scoped_refptr<Counter> cache_misses;
+  scoped_refptr<Counter> cache_misses_caching;
+
+  scoped_refptr<AtomicGauge<uint64_t> > cache_usage;
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_CACHE_METRICS_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/callback_bind-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/callback_bind-test.cc 
b/be/src/kudu/util/callback_bind-test.cc
new file mode 100644
index 0000000..6f75867
--- /dev/null
+++ b/be/src/kudu/util/callback_bind-test.cc
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/macros.h"
+
+#include <gtest/gtest.h>
+
+namespace kudu {
+
+using std::string;
+
+static int Return5() {
+  return 5;
+}
+
+TEST(CallbackBindTest, TestFreeFunction) {
+  Callback<int(void)> func_cb = Bind(&Return5);
+  ASSERT_EQ(5, func_cb.Run());
+}
+
+class Ref : public RefCountedThreadSafe<Ref> {
+ public:
+  int Foo() { return 3; }
+};
+
+// Simple class that helps with verifying ref counting.
+// Not thread-safe.
+struct RefCountable {
+  RefCountable()
+      : refs(0) {
+  }
+  void AddRef() const {
+    refs++;
+  }
+  void Release() const {
+    refs--;
+  }
+  void Print() const {
+    LOG(INFO) << "Hello. Refs: " << refs;
+  }
+
+  mutable int refs;
+  DISALLOW_COPY_AND_ASSIGN(RefCountable);
+};
+
+TEST(CallbackBindTest, TestClassMethod) {
+  scoped_refptr<Ref> ref = new Ref();
+  Callback<int(void)> ref_cb = Bind(&Ref::Foo, ref);
+  ref = nullptr;
+  ASSERT_EQ(3, ref_cb.Run());
+}
+
+int ReturnI(int i, const char* str) {
+  return i;
+}
+
+TEST(CallbackBindTest, TestPartialBind) {
+  Callback<int(const char*)> cb = Bind(&ReturnI, 23);
+  ASSERT_EQ(23, cb.Run("hello world"));
+}
+
+char IncrementChar(gscoped_ptr<char> in) {
+  return *in + 1;
+}
+
+TEST(CallbackBindTest, TestCallScopedPtrArg) {
+  // Calling a function with a gscoped_ptr argument is just like any other
+  // function which takes gscoped_ptr:
+  gscoped_ptr<char> foo(new char('x'));
+  Callback<char(gscoped_ptr<char>)> cb = Bind(&IncrementChar);
+  ASSERT_EQ('y', cb.Run(std::move(foo)));
+}
+
+TEST(CallbackBindTest, TestBindScopedPtrArg) {
+  // Binding a function with a gscoped_ptr argument requires using Passed()
+  gscoped_ptr<char> foo(new char('x'));
+  Callback<char(void)> cb = Bind(&IncrementChar, Passed(&foo));
+  ASSERT_EQ('y', cb.Run());
+}
+
+// Test that the ref counting functionality works.
+TEST(CallbackBindTest, TestRefCounting) {
+  RefCountable countable;
+  {
+    ASSERT_EQ(0, countable.refs);
+    Closure cb = Bind(&RefCountable::Print, &countable);
+    ASSERT_EQ(1, countable.refs);
+    cb.Run();
+    ASSERT_EQ(1, countable.refs);
+  }
+  ASSERT_EQ(0, countable.refs);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/coding-inl.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/coding-inl.h b/be/src/kudu/util/coding-inl.h
new file mode 100644
index 0000000..5fe0f9d
--- /dev/null
+++ b/be/src/kudu/util/coding-inl.h
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// Some portions Copyright (c) 2011 The LevelDB Authors.
+//
+// Endian-neutral encoding:
+// * Fixed-length numbers are encoded with least-significant byte first
+// * In addition we support variable length "varint" encoding
+// * Strings are encoded prefixed by their length in varint format
+
+#ifndef KUDU_UTIL_CODING_INL_H
+#define KUDU_UTIL_CODING_INL_H
+
+#include <stdint.h>
+#include <string.h>
+
+namespace kudu {
+
+inline uint8_t *InlineEncodeVarint32(uint8_t *dst, uint32_t v) {
+  // Operate on characters as unsigneds
+  uint8_t *ptr = dst;
+  static const int B = 128;
+  if (v < (1<<7)) {
+    *(ptr++) = v;
+  } else if (v < (1<<14)) {
+    *(ptr++) = v | B;
+    *(ptr++) = v>>7;
+  } else if (v < (1<<21)) {
+    *(ptr++) = v | B;
+    *(ptr++) = (v>>7) | B;
+    *(ptr++) = v>>14;
+  } else if (v < (1<<28)) {
+    *(ptr++) = v | B;
+    *(ptr++) = (v>>7) | B;
+    *(ptr++) = (v>>14) | B;
+    *(ptr++) = v>>21;
+  } else {
+    *(ptr++) = v | B;
+    *(ptr++) = (v>>7) | B;
+    *(ptr++) = (v>>14) | B;
+    *(ptr++) = (v>>21) | B;
+    *(ptr++) = v>>28;
+  }
+  return ptr;
+}
+
+inline void InlineEncodeFixed32(uint8_t *buf, uint32_t value) {
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+  memcpy(buf, &value, sizeof(value));
+#else
+  buf[0] = value & 0xff;
+  buf[1] = (value >> 8) & 0xff;
+  buf[2] = (value >> 16) & 0xff;
+  buf[3] = (value >> 24) & 0xff;
+#endif
+}
+
+inline void InlineEncodeFixed64(uint8_t *buf, uint64_t value) {
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+  memcpy(buf, &value, sizeof(value));
+#else
+  buf[0] = value & 0xff;
+  buf[1] = (value >> 8) & 0xff;
+  buf[2] = (value >> 16) & 0xff;
+  buf[3] = (value >> 24) & 0xff;
+  buf[4] = (value >> 32) & 0xff;
+  buf[5] = (value >> 40) & 0xff;
+  buf[6] = (value >> 48) & 0xff;
+  buf[7] = (value >> 56) & 0xff;
+#endif
+}
+
+
+// Standard Put... routines append to a string
+template <class StrType>
+inline void InlinePutFixed32(StrType *dst, uint32_t value) {
+  uint8_t buf[sizeof(value)];
+  InlineEncodeFixed32(buf, value);
+  dst->append(buf, sizeof(buf));
+}
+
+template <class StrType>
+inline void InlinePutFixed64(StrType *dst, uint64_t value) {
+  uint8_t buf[sizeof(value)];
+  InlineEncodeFixed64(buf, value);
+  dst->append(buf, sizeof(buf));
+}
+
+template <class StrType>
+inline void InlinePutVarint32(StrType* dst, uint32_t v) {
+  // We resize the array and then size it back down as appropriate
+  // rather than using append(), since the generated code ends up
+  // being substantially shorter.
+  int old_size = dst->size();
+  dst->resize(old_size + 5);
+  uint8_t* p = &(*dst)[old_size];
+  uint8_t *ptr = InlineEncodeVarint32(p, v);
+
+  dst->resize(old_size + ptr - p);
+}
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/coding.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/coding.cc b/be/src/kudu/util/coding.cc
new file mode 100644
index 0000000..bd3cfcd
--- /dev/null
+++ b/be/src/kudu/util/coding.cc
@@ -0,0 +1,141 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "kudu/util/coding.h"
+#include "kudu/util/coding-inl.h"
+
+namespace kudu {
+
+void PutVarint32(faststring* dst, uint32_t v) {
+  uint8_t buf[5];
+  uint8_t* ptr = InlineEncodeVarint32(buf, v);
+  dst->append(buf, ptr - buf);
+}
+
+uint8_t* EncodeVarint64(uint8_t* dst, uint64_t v) {
+  static const int B = 128;
+  while (v >= B) {
+    *(dst++) = (v & (B-1)) | B;
+    v >>= 7;
+  }
+  *(dst++) = static_cast<uint8_t>(v);
+  return dst;
+}
+
+void PutFixed32(faststring *dst, uint32_t value) {
+  InlinePutFixed32(dst, value);
+}
+
+void PutFixed64(faststring *dst, uint64_t value) {
+  InlinePutFixed64(dst, value);
+}
+
+void PutVarint64(faststring *dst, uint64_t v) {
+  uint8_t buf[10];
+  uint8_t* ptr = EncodeVarint64(buf, v);
+  dst->append(buf, ptr - buf);
+}
+
+void PutLengthPrefixedSlice(faststring* dst, const Slice& value) {
+  PutVarint32(dst, value.size());
+  dst->append(value.data(), value.size());
+}
+
+void PutFixed32LengthPrefixedSlice(faststring* dst, const Slice& value) {
+  PutFixed32(dst, value.size());
+  dst->append(value.data(), value.size());
+}
+
+int VarintLength(uint64_t v) {
+  int len = 1;
+  while (v >= 128) {
+    v >>= 7;
+    len++;
+  }
+  return len;
+}
+
+const uint8_t *GetVarint32PtrFallback(const uint8_t *p,
+                                   const uint8_t *limit,
+                                   uint32_t* value) {
+  uint32_t result = 0;
+  for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
+    uint32_t byte = *p;
+    p++;
+    if (byte & 128) {
+      // More bytes are present
+      result |= ((byte & 127) << shift);
+    } else {
+      result |= (byte << shift);
+      *value = result;
+      return p;
+    }
+  }
+  return nullptr;
+}
+
+bool GetVarint32(Slice* input, uint32_t* value) {
+  const uint8_t *p = input->data();
+  const uint8_t *limit = p + input->size();
+  const uint8_t *q = GetVarint32Ptr(p, limit, value);
+  if (q == nullptr) {
+    return false;
+  } else {
+    *input = Slice(q, limit - q);
+    return true;
+  }
+}
+
+const uint8_t *GetVarint64Ptr(const uint8_t *p, const uint8_t *limit, 
uint64_t* value) {
+  uint64_t result = 0;
+  for (uint32_t shift = 0; shift <= 63 && p < limit; shift += 7) {
+    uint64_t byte = *p;
+    p++;
+    if (byte & 128) {
+      // More bytes are present
+      result |= ((byte & 127) << shift);
+    } else {
+      result |= (byte << shift);
+      *value = result;
+      return p;
+    }
+  }
+  return nullptr;
+}
+
+bool GetVarint64(Slice* input, uint64_t* value) {
+  const uint8_t *p = input->data();
+  const uint8_t *limit = p + input->size();
+  const uint8_t *q = GetVarint64Ptr(p, limit, value);
+  if (q == nullptr) {
+    return false;
+  } else {
+    *input = Slice(q, limit - q);
+    return true;
+  }
+}
+
+const uint8_t *GetLengthPrefixedSlice(const uint8_t *p, const uint8_t *limit,
+                                   Slice* result) {
+  uint32_t len = 0;
+  p = GetVarint32Ptr(p, limit, &len);
+  if (p == nullptr) return nullptr;
+  if (p + len > limit) return nullptr;
+  *result = Slice(p, len);
+  return p + len;
+}
+
+bool GetLengthPrefixedSlice(Slice* input, Slice* result) {
+  uint32_t len = 0;
+  if (GetVarint32(input, &len) &&
+      input->size() >= len) {
+    *result = Slice(input->data(), len);
+    input->remove_prefix(len);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/coding.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/coding.h b/be/src/kudu/util/coding.h
new file mode 100644
index 0000000..698d92a
--- /dev/null
+++ b/be/src/kudu/util/coding.h
@@ -0,0 +1,110 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+// Endian-neutral encoding:
+// * Fixed-length numbers are encoded with least-significant byte first
+// * In addition we support variable length "varint" encoding
+// * Strings are encoded prefixed by their length in varint format
+
+#ifndef STORAGE_LEVELDB_UTIL_CODING_H_
+#define STORAGE_LEVELDB_UTIL_CODING_H_
+
+#include <stdint.h>
+#include <string.h>
+#include <string>
+
+#include "kudu/util/slice.h"
+#include "kudu/util/faststring.h"
+
+namespace kudu {
+extern void PutFixed32(faststring* dst, uint32_t value);
+extern void PutFixed64(faststring* dst, uint64_t value);
+extern void PutVarint32(faststring* dst, uint32_t value);
+extern void PutVarint64(faststring* dst, uint64_t value);
+
+// Put a length-prefixed Slice into the buffer. The length prefix
+// is varint-encoded.
+extern void PutLengthPrefixedSlice(faststring* dst, const Slice& value);
+
+// Put a length-prefixed Slice into the buffer. The length prefix
+// is 32-bit fixed encoded in little endian.
+extern void PutFixed32LengthPrefixedSlice(faststring* dst, const Slice& value);
+
+// Standard Get... routines parse a value from the beginning of a Slice
+// and advance the slice past the parsed value.
+extern bool GetVarint32(Slice* input, uint32_t* value);
+extern bool GetVarint64(Slice* input, uint64_t* value);
+extern bool GetLengthPrefixedSlice(Slice* input, Slice* result);
+
+// Pointer-based variants of GetVarint...  These either store a value
+// in *v and return a pointer just past the parsed value, or return
+// NULL on error.  These routines only look at bytes in the range
+// [p..limit-1]
+extern const uint8_t *GetVarint32Ptr(const uint8_t *p,const uint8_t *limit, 
uint32_t* v);
+extern const uint8_t *GetVarint64Ptr(const uint8_t *p,const uint8_t *limit, 
uint64_t* v);
+
+// Returns the length of the varint32 or varint64 encoding of "v"
+extern int VarintLength(uint64_t v);
+
+// Lower-level versions of Put... that write directly into a character buffer
+// REQUIRES: dst has enough space for the value being written
+extern void EncodeFixed32(uint8_t *dst, uint32_t value);
+extern void EncodeFixed64(uint8_t *dst, uint64_t value);
+
+// Lower-level versions of Put... that write directly into a character buffer
+// and return a pointer just past the last byte written.
+// REQUIRES: dst has enough space for the value being written
+extern uint8_t *EncodeVarint32(uint8_t *dst, uint32_t value);
+extern uint8_t *EncodeVarint64(uint8_t *dst, uint64_t value);
+
+// Lower-level versions of Get... that read directly from a character buffer
+// without any bounds checking.
+
+inline uint32_t DecodeFixed32(const uint8_t *ptr) {
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+    // Load the raw bytes
+    uint32_t result;
+    memcpy(&result, ptr, sizeof(result));  // gcc optimizes this to a plain 
load
+    return result;
+#else
+    return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))
+        | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)
+        | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)
+        | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));
+#endif
+}
+
+inline uint64_t DecodeFixed64(const uint8_t *ptr) {
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+    // Load the raw bytes
+    uint64_t result;
+    memcpy(&result, ptr, sizeof(result));  // gcc optimizes this to a plain 
load
+    return result;
+#else
+    uint64_t lo = DecodeFixed32(ptr);
+    uint64_t hi = DecodeFixed32(ptr + 4);
+    return (hi << 32) | lo;
+#endif
+}
+
+// Internal routine for use by fallback path of GetVarint32Ptr
+extern const uint8_t *GetVarint32PtrFallback(const uint8_t *p,
+                                             const uint8_t *limit,
+                                             uint32_t* value);
+inline const uint8_t *GetVarint32Ptr(const uint8_t *p,
+                                     const uint8_t *limit,
+                                     uint32_t* value) {
+  if (PREDICT_TRUE(p < limit)) {
+    uint32_t result = *p;
+    if (PREDICT_TRUE((result & 128) == 0)) {
+      *value = result;
+      return p + 1;
+    }
+  }
+  return GetVarint32PtrFallback(p, limit, value);
+}
+
+}  // namespace kudu
+
+#endif  // STORAGE_LEVELDB_UTIL_CODING_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/compression/compression-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/compression/compression-test.cc 
b/be/src/kudu/util/compression/compression-test.cc
new file mode 100644
index 0000000..1befbe5
--- /dev/null
+++ b/be/src/kudu/util/compression/compression-test.cc
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <stdlib.h>
+
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/compression/compression_codec.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+using std::vector;
+
+class TestCompression : public KuduTest {};
+
+static void TestCompressionCodec(CompressionType compression) {
+  const int kInputSize = 64;
+
+  const CompressionCodec* codec;
+  uint8_t ibuffer[kInputSize];
+  uint8_t ubuffer[kInputSize];
+  size_t compressed;
+
+  // Fill the test input buffer
+  memset(ibuffer, 'Z', kInputSize);
+
+  // Get the specified compression codec
+  ASSERT_OK(GetCompressionCodec(compression, &codec));
+
+  // Allocate the compression buffer
+  size_t max_compressed = codec->MaxCompressedLength(kInputSize);
+  ASSERT_LT(max_compressed, (kInputSize * 2));
+  gscoped_array<uint8_t> cbuffer(new uint8_t[max_compressed]);
+
+  // Compress and uncompress
+  ASSERT_OK(codec->Compress(Slice(ibuffer, kInputSize), cbuffer.get(), 
&compressed));
+  ASSERT_OK(codec->Uncompress(Slice(cbuffer.get(), compressed), ubuffer, 
kInputSize));
+  ASSERT_EQ(0, memcmp(ibuffer, ubuffer, kInputSize));
+
+  // Compress slices and uncompress
+  vector<Slice> v;
+  v.push_back(Slice(ibuffer, 1));
+  for (int i = 1; i <= kInputSize; i += 7)
+    v.push_back(Slice(ibuffer + i, 7));
+  ASSERT_OK(codec->Compress(Slice(ibuffer, kInputSize), cbuffer.get(), 
&compressed));
+  ASSERT_OK(codec->Uncompress(Slice(cbuffer.get(), compressed), ubuffer, 
kInputSize));
+  ASSERT_EQ(0, memcmp(ibuffer, ubuffer, kInputSize));
+}
+
+TEST_F(TestCompression, TestNoCompressionCodec) {
+  const CompressionCodec* codec;
+  ASSERT_OK(GetCompressionCodec(NO_COMPRESSION, &codec));
+  ASSERT_EQ(nullptr, codec);
+}
+
+TEST_F(TestCompression, TestSnappyCompressionCodec) {
+  TestCompressionCodec(SNAPPY);
+}
+
+TEST_F(TestCompression, TestLz4CompressionCodec) {
+  TestCompressionCodec(LZ4);
+}
+
+TEST_F(TestCompression, TestZlibCompressionCodec) {
+  TestCompressionCodec(ZLIB);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/compression/compression.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/compression/compression.proto 
b/be/src/kudu/util/compression/compression.proto
new file mode 100644
index 0000000..a0f5343
--- /dev/null
+++ b/be/src/kudu/util/compression/compression.proto
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+option java_package = "org.apache.kudu";
+
+enum CompressionType {
+  UNKNOWN_COMPRESSION = 999;
+  DEFAULT_COMPRESSION = 0;
+  NO_COMPRESSION = 1;
+  SNAPPY = 2;
+  LZ4 = 3;
+  ZLIB = 4;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/compression/compression_codec.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/compression/compression_codec.cc 
b/be/src/kudu/util/compression/compression_codec.cc
new file mode 100644
index 0000000..ee774cd
--- /dev/null
+++ b/be/src/kudu/util/compression/compression_codec.cc
@@ -0,0 +1,283 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/compression/compression_codec.h"
+
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+#include <lz4.h>
+#include <snappy-sinksource.h>
+#include <snappy.h>
+#include <zlib.h>
+
+
+#include "kudu/gutil/singleton.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/string_case.h"
+
+namespace kudu {
+
+using std::vector;
+
+CompressionCodec::CompressionCodec() {
+}
+CompressionCodec::~CompressionCodec() {
+}
+
+class SlicesSource : public snappy::Source {
+ public:
+  explicit SlicesSource(const std::vector<Slice>& slices)
+    : slice_index_(0),
+      slice_offset_(0),
+      slices_(slices) {
+    available_ = TotalSize();
+  }
+
+  size_t Available() const OVERRIDE {
+    return available_;
+  }
+
+  const char* Peek(size_t* len) OVERRIDE {
+    if (available_ == 0) {
+      *len = 0;
+      return nullptr;
+    }
+
+    const Slice& data = slices_[slice_index_];
+    *len = data.size() - slice_offset_;
+    return reinterpret_cast<const char *>(data.data()) + slice_offset_;
+  }
+
+  void Skip(size_t n) OVERRIDE {
+    DCHECK_LE(n, Available());
+    if (n == 0) return;
+
+    available_ -= n;
+    if ((n + slice_offset_) < slices_[slice_index_].size()) {
+      slice_offset_ += n;
+    } else {
+      n -= slices_[slice_index_].size() - slice_offset_;
+      slice_index_++;
+      while (n > 0 && n >= slices_[slice_index_].size()) {
+        n -= slices_[slice_index_].size();
+        slice_index_++;
+      }
+      slice_offset_ = n;
+    }
+  }
+
+  void Dump(faststring *buffer) {
+    buffer->reserve(buffer->size() + TotalSize());
+    for (const Slice& block : slices_) {
+      buffer->append(block.data(), block.size());
+    }
+  }
+
+ private:
+  size_t TotalSize(void) const {
+    size_t size = 0;
+    for (const Slice& data : slices_) {
+      size += data.size();
+    }
+    return size;
+  }
+
+ private:
+  size_t available_;
+  size_t slice_index_;
+  size_t slice_offset_;
+  const vector<Slice>& slices_;
+};
+
+class SnappyCodec : public CompressionCodec {
+ public:
+  static SnappyCodec *GetSingleton() {
+    return Singleton<SnappyCodec>::get();
+  }
+
+  Status Compress(const Slice& input,
+                  uint8_t *compressed, size_t *compressed_length) const 
OVERRIDE {
+    snappy::RawCompress(reinterpret_cast<const char *>(input.data()), 
input.size(),
+                        reinterpret_cast<char *>(compressed), 
compressed_length);
+    return Status::OK();
+  }
+
+  Status Compress(const vector<Slice>& input_slices,
+                  uint8_t *compressed, size_t *compressed_length) const 
OVERRIDE {
+    SlicesSource source(input_slices);
+    snappy::UncheckedByteArraySink sink(reinterpret_cast<char *>(compressed));
+    if ((*compressed_length = snappy::Compress(&source, &sink)) <= 0) {
+      return Status::Corruption("unable to compress the buffer");
+    }
+    return Status::OK();
+  }
+
+  Status Uncompress(const Slice& compressed,
+                    uint8_t *uncompressed,
+                    size_t uncompressed_length) const OVERRIDE {
+    bool success = snappy::RawUncompress(reinterpret_cast<const char 
*>(compressed.data()),
+                                         compressed.size(), 
reinterpret_cast<char *>(uncompressed));
+    return success ? Status::OK() : Status::Corruption("unable to uncompress 
the buffer");
+  }
+
+  size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE {
+    return snappy::MaxCompressedLength(source_bytes);
+  }
+
+  CompressionType type() const override {
+    return SNAPPY;
+  }
+};
+
+class Lz4Codec : public CompressionCodec {
+ public:
+  static Lz4Codec *GetSingleton() {
+    return Singleton<Lz4Codec>::get();
+  }
+
+  Status Compress(const Slice& input,
+                  uint8_t *compressed, size_t *compressed_length) const 
OVERRIDE {
+    int n = LZ4_compress(reinterpret_cast<const char *>(input.data()),
+                         reinterpret_cast<char *>(compressed), input.size());
+    *compressed_length = n;
+    return Status::OK();
+  }
+
+  Status Compress(const vector<Slice>& input_slices,
+                  uint8_t *compressed, size_t *compressed_length) const 
OVERRIDE {
+    if (input_slices.size() == 1) {
+      return Compress(input_slices[0], compressed, compressed_length);
+    }
+
+    SlicesSource source(input_slices);
+    faststring buffer;
+    source.Dump(&buffer);
+    return Compress(Slice(buffer.data(), buffer.size()), compressed, 
compressed_length);
+  }
+
+  Status Uncompress(const Slice& compressed,
+                    uint8_t *uncompressed,
+                    size_t uncompressed_length) const OVERRIDE {
+    int n = LZ4_decompress_fast(reinterpret_cast<const char 
*>(compressed.data()),
+                                reinterpret_cast<char *>(uncompressed), 
uncompressed_length);
+    if (n != compressed.size()) {
+      return Status::Corruption(
+        StringPrintf("unable to uncompress the buffer. error near %d, buffer", 
-n),
+                     KUDU_REDACT(compressed.ToDebugString(100)));
+    }
+    return Status::OK();
+  }
+
+  size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE {
+    return LZ4_compressBound(source_bytes);
+  }
+
+  CompressionType type() const override {
+    return LZ4;
+  }
+};
+
+/**
+ * TODO: use a instance-local Arena and pass alloc/free into zlib
+ * so that it allocates from the arena.
+ */
+class ZlibCodec : public CompressionCodec {
+ public:
+  static ZlibCodec *GetSingleton() {
+    return Singleton<ZlibCodec>::get();
+  }
+
+  Status Compress(const Slice& input,
+                  uint8_t *compressed, size_t *compressed_length) const 
OVERRIDE {
+    *compressed_length = MaxCompressedLength(input.size());
+    int err = ::compress(compressed, compressed_length, input.data(), 
input.size());
+    return err == Z_OK ? Status::OK() : Status::IOError("unable to compress 
the buffer");
+  }
+
+  Status Compress(const vector<Slice>& input_slices,
+                  uint8_t *compressed, size_t *compressed_length) const 
OVERRIDE {
+    if (input_slices.size() == 1) {
+      return Compress(input_slices[0], compressed, compressed_length);
+    }
+
+    // TODO: use z_stream
+    SlicesSource source(input_slices);
+    faststring buffer;
+    source.Dump(&buffer);
+    return Compress(Slice(buffer.data(), buffer.size()), compressed, 
compressed_length);
+  }
+
+  Status Uncompress(const Slice& compressed,
+                    uint8_t *uncompressed, size_t uncompressed_length) const 
OVERRIDE {
+    int err = ::uncompress(uncompressed, &uncompressed_length,
+                           compressed.data(), compressed.size());
+    return err == Z_OK ? Status::OK() : Status::Corruption("unable to 
uncompress the buffer");
+  }
+
+  size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE {
+    // one-time overhead of six bytes for the entire stream plus five bytes 
per 16 KB block
+    return source_bytes + (6 + (5 * ((source_bytes + 16383) >> 14)));
+  }
+
+  CompressionType type() const override {
+    return ZLIB;
+  }
+};
+
+Status GetCompressionCodec(CompressionType compression,
+                           const CompressionCodec** codec) {
+  switch (compression) {
+    case NO_COMPRESSION:
+      *codec = nullptr;
+      break;
+    case SNAPPY:
+      *codec = SnappyCodec::GetSingleton();
+      break;
+    case LZ4:
+      *codec = Lz4Codec::GetSingleton();
+      break;
+    case ZLIB:
+      *codec = ZlibCodec::GetSingleton();
+      break;
+    default:
+      return Status::NotFound("bad compression type");
+  }
+  return Status::OK();
+}
+
+CompressionType GetCompressionCodecType(const std::string& name) {
+  string uname;
+  ToUpperCase(name, &uname);
+
+  if (uname.compare("SNAPPY") == 0)
+    return SNAPPY;
+  if (uname.compare("LZ4") == 0)
+    return LZ4;
+  if (uname.compare("ZLIB") == 0)
+    return ZLIB;
+  if (uname.compare("NONE") == 0)
+    return NO_COMPRESSION;
+
+  LOG(WARNING) << "Unable to recognize the compression codec '" << name
+               << "' using no compression as default.";
+  return NO_COMPRESSION;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/compression/compression_codec.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/compression/compression_codec.h 
b/be/src/kudu/util/compression/compression_codec.h
new file mode 100644
index 0000000..538af15
--- /dev/null
+++ b/be/src/kudu/util/compression/compression_codec.h
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_CFILE_COMPRESSION_CODEC_H
+#define KUDU_CFILE_COMPRESSION_CODEC_H
+
+#include <string>
+#include <vector>
+
+#include "kudu/util/compression/compression.pb.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class CompressionCodec {
+ public:
+  CompressionCodec();
+  virtual ~CompressionCodec();
+
+  // REQUIRES: "compressed" must point to an area of memory that is at
+  // least "MaxCompressedLength(input_length)" bytes in length.
+  //
+  // Takes the data stored in "input[0..input_length]" and stores
+  // it in the array pointed to by "compressed".
+  //
+  // returns the length of the compressed output.
+  virtual Status Compress(const Slice& input,
+                          uint8_t *compressed, size_t *compressed_length) 
const = 0;
+
+  virtual Status Compress(const std::vector<Slice>& input_slices,
+                          uint8_t *compressed, size_t *compressed_length) 
const = 0;
+
+  // Given data in "compressed[0..compressed_length-1]" generated by
+  // calling the Compress routine, this routine stores the uncompressed data
+  // to uncompressed[0..uncompressed_length-1]
+  // returns false if the message is corrupted and could not be uncompressed
+  virtual Status Uncompress(const Slice& compressed,
+                            uint8_t *uncompressed, size_t uncompressed_length) 
const = 0;
+
+  // Returns the maximal size of the compressed representation of
+  // input data that is "source_bytes" bytes in length.
+  virtual size_t MaxCompressedLength(size_t source_bytes) const = 0;
+
+  // Return the type of compression implemented by this codec.
+  virtual CompressionType type() const = 0;
+ private:
+  DISALLOW_COPY_AND_ASSIGN(CompressionCodec);
+};
+
+// Returns the compression codec for the specified type.
+//
+// The returned codec is a singleton and should be not be destroyed.
+Status GetCompressionCodec(CompressionType compression,
+                           const CompressionCodec** codec);
+
+// Returns the compression codec type given the name
+CompressionType GetCompressionCodecType(const std::string& name);
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/condition_variable.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/condition_variable.cc 
b/be/src/kudu/util/condition_variable.cc
new file mode 100644
index 0000000..13d1d36
--- /dev/null
+++ b/be/src/kudu/util/condition_variable.cc
@@ -0,0 +1,140 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "kudu/util/condition_variable.h"
+
+#include <glog/logging.h>
+
+#include <errno.h>
+#include <sys/time.h>
+
+#include "kudu/util/monotime.h"
+#include "kudu/util/thread_restrictions.h"
+
+namespace kudu {
+
+ConditionVariable::ConditionVariable(Mutex* user_lock)
+    : user_mutex_(&user_lock->native_handle_)
+#if !defined(NDEBUG)
+    , user_lock_(user_lock)
+#endif
+{
+  int rv = 0;
+  // http://crbug.com/293736
+  // NaCl doesn't support monotonic clock based absolute deadlines.
+  // On older Android platform versions, it's supported through the
+  // non-standard pthread_cond_timedwait_monotonic_np. Newer platform
+  // versions have pthread_condattr_setclock.
+  // Mac can use relative time deadlines.
+#if !defined(__APPLE__) && !defined(OS_NACL) && \
+      !(defined(OS_ANDROID) && defined(HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC))
+  pthread_condattr_t attrs;
+  rv = pthread_condattr_init(&attrs);
+  DCHECK_EQ(0, rv);
+  pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC);
+  rv = pthread_cond_init(&condition_, &attrs);
+  pthread_condattr_destroy(&attrs);
+#else
+  rv = pthread_cond_init(&condition_, nullptr);
+#endif
+  DCHECK_EQ(0, rv);
+}
+
+ConditionVariable::~ConditionVariable() {
+#if defined(OS_MACOSX)
+  // This hack is necessary to avoid a fatal pthreads subsystem bug in the
+  // Darwin kernel. https://codereview.chromium.org/1323293005/
+  {
+    Mutex lock;
+    MutexLock l(lock);
+    struct timespec ts;
+    ts.tv_sec = 0;
+    ts.tv_nsec = 1;
+    pthread_cond_timedwait_relative_np(&condition_, lock.native_handle, &ts);
+  }
+#endif
+  int rv = pthread_cond_destroy(&condition_);
+  DCHECK_EQ(0, rv);
+}
+
+void ConditionVariable::Wait() const {
+  ThreadRestrictions::AssertWaitAllowed();
+#if !defined(NDEBUG)
+  user_lock_->CheckHeldAndUnmark();
+#endif
+  int rv = pthread_cond_wait(&condition_, user_mutex_);
+  DCHECK_EQ(0, rv);
+#if !defined(NDEBUG)
+  user_lock_->CheckUnheldAndMark();
+#endif
+}
+
+bool ConditionVariable::TimedWait(const MonoDelta& max_time) const {
+  ThreadRestrictions::AssertWaitAllowed();
+
+  // Negative delta means we've already timed out.
+  int64 nsecs = max_time.ToNanoseconds();
+  if (nsecs < 0) {
+    return false;
+  }
+
+  struct timespec relative_time;
+  max_time.ToTimeSpec(&relative_time);
+
+#if !defined(NDEBUG)
+  user_lock_->CheckHeldAndUnmark();
+#endif
+
+#if defined(__APPLE__)
+  int rv = pthread_cond_timedwait_relative_np(
+      &condition_, user_mutex_, &relative_time);
+#else
+  // The timeout argument to pthread_cond_timedwait is in absolute time.
+  struct timespec absolute_time;
+#if defined(OS_NACL)
+  // See comment in constructor for why this is different in NaCl.
+  struct timeval now;
+  gettimeofday(&now, NULL);
+  absolute_time.tv_sec = now.tv_sec;
+  absolute_time.tv_nsec = now.tv_usec * MonoTime::kNanosecondsPerMicrosecond;
+#else
+  struct timespec now;
+  clock_gettime(CLOCK_MONOTONIC, &now);
+  absolute_time.tv_sec = now.tv_sec;
+  absolute_time.tv_nsec = now.tv_nsec;
+#endif
+
+  absolute_time.tv_sec += relative_time.tv_sec;
+  absolute_time.tv_nsec += relative_time.tv_nsec;
+  absolute_time.tv_sec += absolute_time.tv_nsec / 
MonoTime::kNanosecondsPerSecond;
+  absolute_time.tv_nsec %= MonoTime::kNanosecondsPerSecond;
+  DCHECK_GE(absolute_time.tv_sec, now.tv_sec);  // Overflow paranoia
+
+#if defined(OS_ANDROID) && defined(HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC)
+  int rv = pthread_cond_timedwait_monotonic_np(
+      &condition_, user_mutex_, &absolute_time);
+#else
+  int rv = pthread_cond_timedwait(&condition_, user_mutex_, &absolute_time);
+#endif  // OS_ANDROID && HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+#endif  // __APPLE__
+
+  DCHECK(rv == 0 || rv == ETIMEDOUT)
+    << "unexpected pthread_cond_timedwait return value: " << rv;
+#if !defined(NDEBUG)
+  user_lock_->CheckUnheldAndMark();
+#endif
+  return rv == 0;
+}
+
+void ConditionVariable::Broadcast() {
+  int rv = pthread_cond_broadcast(&condition_);
+  DCHECK_EQ(0, rv);
+}
+
+void ConditionVariable::Signal() {
+  int rv = pthread_cond_signal(&condition_);
+  DCHECK_EQ(0, rv);
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/condition_variable.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/condition_variable.h 
b/be/src/kudu/util/condition_variable.h
new file mode 100644
index 0000000..ca6e265
--- /dev/null
+++ b/be/src/kudu/util/condition_variable.h
@@ -0,0 +1,113 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// ConditionVariable wraps pthreads condition variable synchronization or, on
+// Windows, simulates it.  This functionality is very helpful for having
+// several threads wait for an event, as is common with a thread pool managed
+// by a master.  The meaning of such an event in the (worker) thread pool
+// scenario is that additional tasks are now available for processing.  It is
+// used in Chrome in the DNS prefetching system to notify worker threads that
+// a queue now has items (tasks) which need to be tended to.  A related use
+// would have a pool manager waiting on a ConditionVariable, waiting for a
+// thread in the pool to announce (signal) that there is now more room in a
+// (bounded size) communications queue for the manager to deposit tasks, or,
+// as a second example, that the queue of tasks is completely empty and all
+// workers are waiting.
+//
+// USAGE NOTE 1: spurious signal events are possible with this and
+// most implementations of condition variables.  As a result, be
+// *sure* to retest your condition before proceeding.  The following
+// is a good example of doing this correctly:
+//
+// while (!work_to_be_done()) Wait(...);
+//
+// In contrast do NOT do the following:
+//
+// if (!work_to_be_done()) Wait(...);  // Don't do this.
+//
+// Especially avoid the above if you are relying on some other thread only
+// issuing a signal up *if* there is work-to-do.  There can/will
+// be spurious signals.  Recheck state on waiting thread before
+// assuming the signal was intentional. Caveat caller ;-).
+//
+// USAGE NOTE 2: Broadcast() frees up all waiting threads at once,
+// which leads to contention for the locks they all held when they
+// called Wait().  This results in POOR performance.  A much better
+// approach to getting a lot of threads out of Wait() is to have each
+// thread (upon exiting Wait()) call Signal() to free up another
+// Wait'ing thread.  Look at condition_variable_unittest.cc for
+// both examples.
+//
+// Broadcast() can be used nicely during teardown, as it gets the job
+// done, and leaves no sleeping threads... and performance is less
+// critical at that point.
+//
+// The semantics of Broadcast() are carefully crafted so that *all*
+// threads that were waiting when the request was made will indeed
+// get signaled.  Some implementations mess up, and don't signal them
+// all, while others allow the wait to be effectively turned off (for
+// a while while waiting threads come around).  This implementation
+// appears correct, as it will not "lose" any signals, and will guarantee
+// that all threads get signaled by Broadcast().
+//
+// This implementation offers support for "performance" in its selection of
+// which thread to revive.  Performance, in direct contrast with "fairness,"
+// assures that the thread that most recently began to Wait() is selected by
+// Signal to revive.  Fairness would (if publicly supported) assure that the
+// thread that has Wait()ed the longest is selected. The default policy
+// may improve performance, as the selected thread may have a greater chance of
+// having some of its stack data in various CPU caches.
+//
+// For a discussion of the many very subtle implementation details, see the FAQ
+// at the end of condition_variable_win.cc.
+
+#ifndef BASE_SYNCHRONIZATION_CONDITION_VARIABLE_H_
+#define BASE_SYNCHRONIZATION_CONDITION_VARIABLE_H_
+
+#include <pthread.h>
+
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+
+namespace kudu {
+
+class ConditionVarImpl;
+class TimeDelta;
+
+class ConditionVariable {
+ public:
+  // Construct a cv for use with ONLY one user lock.
+  explicit ConditionVariable(Mutex* user_lock);
+
+  ~ConditionVariable();
+
+  // Wait() releases the caller's critical section atomically as it starts to
+  // sleep, and the reacquires it when it is signaled.
+  void Wait() const;
+
+  // Like Wait(), but only waits up to a limited amount of time.
+  //
+  // Returns true if we were Signal()'ed, or false if 'max_time' elapsed.
+  bool TimedWait(const MonoDelta& max_time) const;
+
+  // Broadcast() revives all waiting threads.
+  void Broadcast();
+  // Signal() revives one waiting thread.
+  void Signal();
+
+ private:
+
+  mutable pthread_cond_t condition_;
+  pthread_mutex_t* user_mutex_;
+
+#if !defined(NDEBUG)
+  Mutex* user_lock_;     // Needed to adjust shadow lock state on wait.
+#endif
+
+  DISALLOW_COPY_AND_ASSIGN(ConditionVariable);
+};
+
+}  // namespace kudu
+
+#endif  // BASE_SYNCHRONIZATION_CONDITION_VARIABLE_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/countdown_latch-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/countdown_latch-test.cc 
b/be/src/kudu/util/countdown_latch-test.cc
new file mode 100644
index 0000000..cf2517c
--- /dev/null
+++ b/be/src/kudu/util/countdown_latch-test.cc
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <boost/bind.hpp>
+#include <gtest/gtest.h>
+
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/threadpool.h"
+
+namespace kudu {
+
+static void DecrementLatch(CountDownLatch* latch, int amount) {
+  if (amount == 1) {
+    latch->CountDown();
+    return;
+  }
+  latch->CountDown(amount);
+}
+
+// Tests that we can decrement the latch by arbitrary amounts, as well
+// as 1 by one.
+TEST(TestCountDownLatch, TestLatch) {
+
+  gscoped_ptr<ThreadPool> pool;
+  ASSERT_OK(ThreadPoolBuilder("cdl-test").set_max_threads(1).Build(&pool));
+
+  CountDownLatch latch(1000);
+
+  // Decrement the count by 1 in another thread, this should not fire the
+  // latch.
+  ASSERT_OK(pool->SubmitFunc(boost::bind(DecrementLatch, &latch, 1)));
+  ASSERT_FALSE(latch.WaitFor(MonoDelta::FromMilliseconds(200)));
+  ASSERT_EQ(999, latch.count());
+
+  // Now decrement by 1000 this should decrement to 0 and fire the latch
+  // (even though 1000 is one more than the current count).
+  ASSERT_OK(pool->SubmitFunc(boost::bind(DecrementLatch, &latch, 1000)));
+  latch.Wait();
+  ASSERT_EQ(0, latch.count());
+}
+
+// Test that resetting to zero while there are waiters lets the waiters
+// continue.
+TEST(TestCountDownLatch, TestResetToZero) {
+  CountDownLatch cdl(100);
+  scoped_refptr<Thread> t;
+  ASSERT_OK(Thread::Create("test", "cdl-test", &CountDownLatch::Wait, &cdl, 
&t));
+
+  // Sleep for a bit until it's likely the other thread is waiting on the 
latch.
+  SleepFor(MonoDelta::FromMilliseconds(10));
+  cdl.Reset(0);
+  t->Join();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/countdown_latch.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/countdown_latch.h 
b/be/src/kudu/util/countdown_latch.h
new file mode 100644
index 0000000..7024c1c
--- /dev/null
+++ b/be/src/kudu/util/countdown_latch.h
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_COUNTDOWN_LATCH_H
+#define KUDU_UTIL_COUNTDOWN_LATCH_H
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/thread_restrictions.h"
+
+namespace kudu {
+
+// This is a C++ implementation of the Java CountDownLatch
+// class.
+// See 
http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html
+class CountDownLatch {
+ public:
+  // Initialize the latch with the given initial count.
+  explicit CountDownLatch(int count)
+    : cond_(&lock_),
+      count_(count) {
+  }
+
+  // Decrement the count of this latch by 'amount'
+  // If the new count is less than or equal to zero, then all waiting threads 
are woken up.
+  // If the count is already zero, this has no effect.
+  void CountDown(int amount) {
+    DCHECK_GE(amount, 0);
+    MutexLock lock(lock_);
+    if (count_ == 0) {
+      return;
+    }
+
+    if (amount >= count_) {
+      count_ = 0;
+    } else {
+      count_ -= amount;
+    }
+
+    if (count_ == 0) {
+      // Latch has triggered.
+      cond_.Broadcast();
+    }
+  }
+
+  // Decrement the count of this latch.
+  // If the new count is zero, then all waiting threads are woken up.
+  // If the count is already zero, this has no effect.
+  void CountDown() {
+    CountDown(1);
+  }
+
+  // Wait until the count on the latch reaches zero.
+  // If the count is already zero, this returns immediately.
+  void Wait() const {
+    ThreadRestrictions::AssertWaitAllowed();
+    MutexLock lock(lock_);
+    while (count_ > 0) {
+      cond_.Wait();
+    }
+  }
+
+  // Waits for the count on the latch to reach zero, or until 'until' time is 
reached.
+  // Returns true if the count became zero, false otherwise.
+  bool WaitUntil(const MonoTime& when) const {
+    ThreadRestrictions::AssertWaitAllowed();
+    return WaitFor(when - MonoTime::Now());
+  }
+
+  // Waits for the count on the latch to reach zero, or until 'delta' time 
elapses.
+  // Returns true if the count became zero, false otherwise.
+  bool WaitFor(const MonoDelta& delta) const {
+    ThreadRestrictions::AssertWaitAllowed();
+    MutexLock lock(lock_);
+    while (count_ > 0) {
+      if (!cond_.TimedWait(delta)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // Reset the latch with the given count. This is equivalent to reconstructing
+  // the latch. If 'count' is 0, and there are currently waiters, those waiters
+  // will be triggered as if you counted down to 0.
+  void Reset(uint64_t count) {
+    MutexLock lock(lock_);
+    count_ = count;
+    if (count_ == 0) {
+      // Awake any waiters if we reset to 0.
+      cond_.Broadcast();
+    }
+  }
+
+  uint64_t count() const {
+    MutexLock lock(lock_);
+    return count_;
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(CountDownLatch);
+  mutable Mutex lock_;
+  ConditionVariable cond_;
+
+  uint64_t count_;
+};
+
+// Utility class which calls latch->CountDown() in its destructor.
+class CountDownOnScopeExit {
+ public:
+  explicit CountDownOnScopeExit(CountDownLatch *latch) : latch_(latch) {}
+  ~CountDownOnScopeExit() {
+    latch_->CountDown();
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(CountDownOnScopeExit);
+
+  CountDownLatch *latch_;
+};
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/cow_object.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/cow_object.h b/be/src/kudu/util/cow_object.h
new file mode 100644
index 0000000..10c019e
--- /dev/null
+++ b/be/src/kudu/util/cow_object.h
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_COW_OBJECT_H
+#define KUDU_UTIL_COW_OBJECT_H
+
+#include <glog/logging.h>
+#include <algorithm>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/util/rwc_lock.h"
+
+namespace kudu {
+
+// An object which manages its state via copy-on-write.
+//
+// Access to this object can be done more conveniently using the
+// CowLock template class defined below.
+//
+// The 'State' template parameter must be swappable using std::swap.
+template<class State>
+class CowObject {
+ public:
+  CowObject() {}
+  ~CowObject() {}
+
+  void ReadLock() const {
+    lock_.ReadLock();
+  }
+
+  void ReadUnlock() const {
+    lock_.ReadUnlock();
+  }
+
+  // Lock the object for write (preventing concurrent mutators), and make a 
safe
+  // copy of the object to mutate.
+  void StartMutation() {
+    lock_.WriteLock();
+    // Clone our object.
+    dirty_state_.reset(new State(state_));
+  }
+
+  // Abort the current mutation. This drops the write lock without applying any
+  // changes made to the mutable copy.
+  void AbortMutation() {
+    dirty_state_.reset();
+    lock_.WriteUnlock();
+  }
+
+  // Commit the current mutation. This escalates to the "Commit" lock, which
+  // blocks any concurrent readers or writers, swaps in the new version of the
+  // State, and then drops the commit lock.
+  void CommitMutation() {
+    lock_.UpgradeToCommitLock();
+    CHECK(dirty_state_);
+    std::swap(state_, *dirty_state_);
+    dirty_state_.reset();
+    lock_.CommitUnlock();
+  }
+
+  // Return the current state, not reflecting any in-progress mutations.
+  State& state() {
+    DCHECK(lock_.HasReaders() || lock_.HasWriteLock());
+    return state_;
+  }
+
+  const State& state() const {
+    DCHECK(lock_.HasReaders() || lock_.HasWriteLock());
+    return state_;
+  }
+
+  // Returns the current dirty state (i.e reflecting in-progress mutations).
+  // Should only be called by a thread who previously called StartMutation().
+  State* mutable_dirty() {
+    DCHECK(lock_.HasWriteLock());
+    return DCHECK_NOTNULL(dirty_state_.get());
+  }
+
+  const State& dirty() const {
+    return *DCHECK_NOTNULL(dirty_state_.get());
+  }
+
+ private:
+  mutable RWCLock lock_;
+
+  State state_;
+  gscoped_ptr<State> dirty_state_;
+
+  DISALLOW_COPY_AND_ASSIGN(CowObject);
+};
+
+// A lock-guard-like scoped object to acquire the lock on a CowObject,
+// and obtain a pointer to the correct copy to read/write.
+//
+// Example usage:
+//
+//   CowObject<Foo> my_obj;
+//   {
+//     CowLock<Foo> l(&my_obj, CowLock<Foo>::READ);
+//     l.data().get_foo();
+//     ...
+//   }
+//   {
+//     CowLock<Foo> l(&my_obj, CowLock<Foo>::WRITE);
+//     l->mutable_data()->set_foo(...);
+//     ...
+//     l.Commit();
+//   }
+template<class State>
+class CowLock {
+ public:
+  enum LockMode {
+    READ, WRITE, RELEASED
+  };
+
+  // Lock in either read or write mode.
+  CowLock(CowObject<State>* cow,
+          LockMode mode)
+    : cow_(cow),
+      mode_(mode) {
+    if (mode == READ) {
+      cow_->ReadLock();
+    } else if (mode_ == WRITE) {
+      cow_->StartMutation();
+    } else {
+      LOG(FATAL) << "Cannot lock in mode " << mode;
+    }
+  }
+
+  // Lock in read mode.
+  // A const object may not be locked in write mode.
+  CowLock(const CowObject<State>* info,
+          LockMode mode)
+    : cow_(const_cast<CowObject<State>*>(info)),
+      mode_(mode) {
+    if (mode == READ) {
+      cow_->ReadLock();
+    } else if (mode_ == WRITE) {
+      LOG(FATAL) << "Cannot write-lock a const pointer";
+    } else {
+      LOG(FATAL) << "Cannot lock in mode " << mode;
+    }
+  }
+
+  // Commit the underlying object.
+  // Requires that the caller hold the lock in write mode.
+  void Commit() {
+    DCHECK_EQ(WRITE, mode_);
+    cow_->CommitMutation();
+    mode_ = RELEASED;
+  }
+
+  void Unlock() {
+    if (mode_ == READ) {
+      cow_->ReadUnlock();
+    } else if (mode_ == WRITE) {
+      cow_->AbortMutation();
+    } else {
+      DCHECK_EQ(RELEASED, mode_);
+    }
+    mode_ = RELEASED;
+  }
+
+  // Obtain the underlying data. In WRITE mode, this returns the
+  // same data as mutable_data() (not the safe unchanging copy).
+  const State& data() const {
+    if (mode_ == READ) {
+      return cow_->state();
+    } else if (mode_ == WRITE) {
+      return cow_->dirty();
+    } else {
+      LOG(FATAL) << "Cannot access data after committing";
+    }
+  }
+
+  // Obtain the mutable data. This may only be called in WRITE mode.
+  State* mutable_data() {
+    if (mode_ == READ) {
+      LOG(FATAL) << "Cannot mutate data with READ lock";
+    } else if (mode_ == WRITE) {
+      return cow_->mutable_dirty();
+    } else {
+      LOG(FATAL) << "Cannot access data after committing";
+    }
+  }
+
+  bool is_write_locked() const {
+    return mode_ == WRITE;
+  }
+
+  // Drop the lock. If the lock is held in WRITE mode, and the
+  // lock has not yet been released, aborts the mutation, restoring
+  // the underlying object to its original data.
+  ~CowLock() {
+    Unlock();
+  }
+
+ private:
+  CowObject<State>* cow_;
+  LockMode mode_;
+  DISALLOW_COPY_AND_ASSIGN(CowLock);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_COW_OBJECT_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/crc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/crc-test.cc b/be/src/kudu/util/crc-test.cc
new file mode 100644
index 0000000..2c6db4b
--- /dev/null
+++ b/be/src/kudu/util/crc-test.cc
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/crc.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+namespace crc {
+
+using strings::Substitute;
+
+class CrcTest : public KuduTest {
+ protected:
+
+  // Returns pointer to data which must be deleted by caller.
+  static void GenerateBenchmarkData(const uint8_t** bufptr, size_t* buflen) {
+    const uint32_t kNumNumbers = 1000000;
+    const uint32_t kBytesPerNumber = sizeof(uint32_t);
+    const uint32_t kLength = kNumNumbers * kBytesPerNumber;
+    auto buf = new uint8_t[kLength];
+    for (uint32_t i = 0; i < kNumNumbers; i++) {
+      memcpy(buf + (i * kBytesPerNumber), &i, kBytesPerNumber);
+    }
+    *bufptr = buf;
+    *buflen = kLength;
+  }
+
+};
+
+// Basic functionality test.
+TEST_F(CrcTest, TestCRC32C) {
+  const string test_data("abcdefgh");
+  const uint64_t kExpectedCrc = 0xa9421b7; // Known value from crcutil usage 
test program.
+
+  Crc* crc32c = GetCrc32cInstance();
+  uint64_t data_crc = 0;
+  crc32c->Compute(test_data.data(), test_data.length(), &data_crc);
+  char buf[kFastToBufferSize];
+  const char* output = FastHex64ToBuffer(data_crc, buf);
+  LOG(INFO) << "CRC32C of " << test_data << " is: 0x" << output << " (full 64 
bits)";
+  output = FastHex32ToBuffer(static_cast<uint32_t>(data_crc), buf);
+  LOG(INFO) << "CRC32C of " << test_data << " is: 0x" << output << " 
(truncated 32 bits)";
+  ASSERT_EQ(kExpectedCrc, data_crc);
+
+  // Using helper
+  uint64_t data_crc2 = Crc32c(test_data.data(), test_data.length());
+  ASSERT_EQ(kExpectedCrc, data_crc2);
+
+  // Using multiple chunks
+  size_t half_length = test_data.length() / 2;
+  uint64_t data_crc3 = Crc32c(test_data.data(), half_length);
+  data_crc3 = Crc32c(test_data.data() + half_length, half_length, data_crc3);
+  ASSERT_EQ(kExpectedCrc, data_crc3);
+}
+
+// Simple benchmark of CRC32C throughput.
+// We should expect about 8 bytes per cycle in throughput on a single core.
+TEST_F(CrcTest, BenchmarkCRC32C) {
+  gscoped_ptr<const uint8_t[]> data;
+  const uint8_t* buf;
+  size_t buflen;
+  GenerateBenchmarkData(&buf, &buflen);
+  data.reset(buf);
+  Crc* crc32c = GetCrc32cInstance();
+  int kNumRuns = 1000;
+  if (AllowSlowTests()) {
+    kNumRuns = 40000;
+  }
+  const uint64_t kNumBytes = kNumRuns * buflen;
+  Stopwatch sw;
+  sw.start();
+  for (int i = 0; i < kNumRuns; i++) {
+    uint64_t cksum;
+    crc32c->Compute(buf, buflen, &cksum);
+  }
+  sw.stop();
+  CpuTimes elapsed = sw.elapsed();
+  LOG(INFO) << Substitute("$0 runs of CRC32C on $1 bytes of data (total: $2 
bytes)"
+                          " in $3 seconds; $4 bytes per millisecond, $5 bytes 
per nanosecond!",
+                          kNumRuns, buflen, kNumBytes, elapsed.wall_seconds(),
+                          (kNumBytes / elapsed.wall_millis()),
+                          (kNumBytes / elapsed.wall));
+}
+
+} // namespace crc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/crc.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/crc.cc b/be/src/kudu/util/crc.cc
new file mode 100644
index 0000000..1534b8d
--- /dev/null
+++ b/be/src/kudu/util/crc.cc
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/util/crc.h"
+
+#include <crcutil/interface.h>
+
+#include "kudu/gutil/once.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
+
+namespace kudu {
+namespace crc {
+
+using debug::ScopedLeakCheckDisabler;
+
+static GoogleOnceType crc32c_once = GOOGLE_ONCE_INIT;
+static Crc* crc32c_instance = nullptr;
+
+static void InitCrc32cInstance() {
+  ScopedLeakCheckDisabler disabler; // CRC instance is never freed.
+  // TODO: Is initial = 0 and roll window = 4 appropriate for all cases?
+  crc32c_instance = crcutil_interface::CRC::CreateCrc32c(true, 0, 4, nullptr);
+}
+
+Crc* GetCrc32cInstance() {
+  GoogleOnceInit(&crc32c_once, &InitCrc32cInstance);
+  return crc32c_instance;
+}
+
+uint32_t Crc32c(const void* data, size_t length) {
+  uint64_t crc32 = 0;
+  GetCrc32cInstance()->Compute(data, length, &crc32);
+  return static_cast<uint32_t>(crc32); // Only uses lower 32 bits.
+}
+
+uint32_t Crc32c(const void* data, size_t length, uint32_t prev_crc32) {
+  uint64_t crc_tmp = static_cast<uint64_t>(prev_crc32);
+  GetCrc32cInstance()->Compute(data, length, &crc_tmp);
+  return static_cast<uint32_t>(crc_tmp); // Only uses lower 32 bits.
+}
+
+} // namespace crc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/crc.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/crc.h b/be/src/kudu/util/crc.h
new file mode 100644
index 0000000..a5db4ea
--- /dev/null
+++ b/be/src/kudu/util/crc.h
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_CRC_H_
+#define KUDU_UTIL_CRC_H_
+
+#include <stdint.h>
+#include <stdlib.h>
+
+#include <crcutil/interface.h>
+
+namespace kudu {
+namespace crc {
+
+typedef crcutil_interface::CRC Crc;
+
+// Returns pointer to singleton instance of CRC32C implementation.
+Crc* GetCrc32cInstance();
+
+// Helper function to simply calculate a CRC32C of the given data.
+uint32_t Crc32c(const void* data, size_t length);
+
+// Given CRC value of previous chunk of data,
+// extends it to new chunk and returns the result.
+uint32_t Crc32c(const void* data, size_t length, uint32_t prev_crc32);
+
+} // namespace crc
+} // namespace kudu
+
+#endif // KUDU_UTIL_CRC_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/curl_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/curl_util.cc b/be/src/kudu/util/curl_util.cc
new file mode 100644
index 0000000..6211834
--- /dev/null
+++ b/be/src/kudu/util/curl_util.cc
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/curl_util.h"
+
+
+#include <curl/curl.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/security/openssl_util.h"
+
+namespace kudu {
+
+namespace {
+
+inline Status TranslateError(CURLcode code) {
+  if (code == CURLE_OK) {
+    return Status::OK();
+  }
+  return Status::NetworkError("curl error", curl_easy_strerror(code));
+}
+
+extern "C" {
+size_t WriteCallback(void* buffer, size_t size, size_t nmemb, void* user_ptr) {
+  size_t real_size = size * nmemb;
+  faststring* buf = reinterpret_cast<faststring*>(user_ptr);
+  CHECK_NOTNULL(buf)->append(reinterpret_cast<const uint8_t*>(buffer), 
real_size);
+  return real_size;
+}
+} // extern "C"
+
+} // anonymous namespace
+
+EasyCurl::EasyCurl() {
+  // Use our own SSL initialization, and disable curl's.
+  // Both of these calls are idempotent.
+  security::InitializeOpenSSL();
+  CHECK_EQ(0, curl_global_init(CURL_GLOBAL_DEFAULT & ~CURL_GLOBAL_SSL));
+  curl_ = curl_easy_init();
+  CHECK(curl_) << "Could not init curl";
+}
+
+EasyCurl::~EasyCurl() {
+  curl_easy_cleanup(curl_);
+}
+
+Status EasyCurl::FetchURL(const std::string& url, faststring* buf) {
+  return DoRequest(url, nullptr, buf);
+}
+
+Status EasyCurl::PostToURL(const std::string& url,
+                           const std::string& post_data,
+                           faststring* dst) {
+  return DoRequest(url, &post_data, dst);
+}
+
+Status EasyCurl::DoRequest(const std::string& url,
+                           const std::string* post_data,
+                           faststring* dst) {
+  CHECK_NOTNULL(dst)->clear();
+
+  RETURN_NOT_OK(TranslateError(curl_easy_setopt(
+      curl_, CURLOPT_SSL_VERIFYPEER,
+      static_cast<long>(verify_peer_)))); // NOLINT(runtime/int)
+  RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_URL, 
url.c_str())));
+  if (return_headers_) {
+    RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_HEADER, 1)));
+  }
+  RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, 
WriteCallback)));
+  RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_WRITEDATA,
+                                                static_cast<void *>(dst))));
+  if (post_data) {
+    RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_POSTFIELDS,
+                                                  post_data->c_str())));
+  }
+
+  RETURN_NOT_OK(TranslateError(curl_easy_perform(curl_)));
+  long rc; // NOLINT(runtime/int) curl wants a long
+  RETURN_NOT_OK(TranslateError(curl_easy_getinfo(curl_, 
CURLINFO_RESPONSE_CODE, &rc)));
+  if (rc != 200) {
+    return Status::RemoteError(strings::Substitute("HTTP $0", rc));
+  }
+
+  return Status::OK();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/curl_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/curl_util.h b/be/src/kudu/util/curl_util.h
new file mode 100644
index 0000000..797c8a6
--- /dev/null
+++ b/be/src/kudu/util/curl_util.h
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_CURL_UTIL_H
+#define KUDU_UTIL_CURL_UTIL_H
+
+#include <string>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+typedef void CURL;
+
+namespace kudu {
+
+class faststring;
+
+// Simple wrapper around curl's "easy" interface, allowing the user to
+// fetch web pages into memory using a blocking API.
+//
+// This is not thread-safe.
+class EasyCurl {
+ public:
+  EasyCurl();
+  ~EasyCurl();
+
+  // Fetch the given URL into the provided buffer.
+  // Any existing data in the buffer is replaced.
+  Status FetchURL(const std::string& url,
+                  faststring* dst);
+
+  // Issue an HTTP POST to the given URL with the given data.
+  // Returns results in 'dst' as above.
+  Status PostToURL(const std::string& url,
+                   const std::string& post_data,
+                   faststring* dst);
+
+  // Set whether to verify the server's SSL certificate in the case of an HTTPS
+  // connection.
+  void set_verify_peer(bool verify) {
+    verify_peer_ = verify;
+  }
+
+  void set_return_headers(bool v) {
+    return_headers_ = v;
+  }
+
+ private:
+  // Do a request. If 'post_data' is non-NULL, does a POST.
+  // Otherwise, does a GET.
+  Status DoRequest(const std::string& url,
+                   const std::string* post_data,
+                   faststring* dst);
+  CURL* curl_;
+
+  // Whether to verify the server certificate.
+  bool verify_peer_ = true;
+
+  // Whether to return the HTTP headers with the response.
+  bool return_headers_ = false;
+
+  DISALLOW_COPY_AND_ASSIGN(EasyCurl);
+};
+
+} // namespace kudu
+
+#endif /* KUDU_UTIL_CURL_UTIL_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/debug-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/debug-util-test.cc 
b/be/src/kudu/util/debug-util-test.cc
new file mode 100644
index 0000000..74b5b79
--- /dev/null
+++ b/be/src/kudu/util/debug-util-test.cc
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/stl_logging.h>
+#include <signal.h>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+class DebugUtilTest : public KuduTest {
+};
+
+TEST_F(DebugUtilTest, TestStackTrace) {
+  StackTrace t;
+  t.Collect(1);
+  string trace = t.Symbolize();
+  ASSERT_STR_CONTAINS(trace, 
"kudu::DebugUtilTest_TestStackTrace_Test::TestBody");
+}
+
+// DumpThreadStack is only supported on Linux, since the implementation relies
+// on the tgkill syscall which is not portable.
+#if defined(__linux__)
+
+namespace {
+void SleeperThread(CountDownLatch* l) {
+  // We use an infinite loop around WaitFor() instead of a normal Wait()
+  // so that this test passes in TSAN. Without this, we run into this TSAN
+  // bug which prevents the sleeping thread from handling signals:
+  // https://code.google.com/p/thread-sanitizer/issues/detail?id=91
+  while (!l->WaitFor(MonoDelta::FromMilliseconds(10))) {
+  }
+}
+
+void fake_signal_handler(int signum) {}
+
+bool IsSignalHandlerRegistered(int signum) {
+  struct sigaction cur_action;
+  CHECK_EQ(0, sigaction(signum, nullptr, &cur_action));
+  return cur_action.sa_handler != SIG_DFL;
+}
+} // anonymous namespace
+
+TEST_F(DebugUtilTest, TestStackTraceInvalidTid) {
+  string s = DumpThreadStack(1);
+  ASSERT_STR_CONTAINS(s, "unable to deliver signal");
+}
+
+TEST_F(DebugUtilTest, TestStackTraceSelf) {
+  string s = DumpThreadStack(Thread::CurrentThreadId());
+  ASSERT_STR_CONTAINS(s, 
"kudu::DebugUtilTest_TestStackTraceSelf_Test::TestBody()");
+}
+
+TEST_F(DebugUtilTest, TestStackTraceMainThread) {
+  string s = DumpThreadStack(getpid());
+  ASSERT_STR_CONTAINS(s, 
"kudu::DebugUtilTest_TestStackTraceMainThread_Test::TestBody()");
+}
+
+TEST_F(DebugUtilTest, TestSignalStackTrace) {
+  CountDownLatch l(1);
+  scoped_refptr<Thread> t;
+  ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t));
+  auto cleanup_thr = MakeScopedCleanup([&]() {
+      // Allow the thread to finish.
+      l.CountDown();
+      t->Join();
+    });
+
+  // We have to loop a little bit because it takes a little while for the 
thread
+  // to start up and actually call our function.
+  ASSERT_EVENTUALLY([&]() {
+      ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "SleeperThread");
+    });
+
+  // Test that we can change the signal and that the stack traces still work,
+  // on the new signal.
+  ASSERT_FALSE(IsSignalHandlerRegistered(SIGHUP));
+  ASSERT_OK(SetStackTraceSignal(SIGHUP));
+
+  // Should now be registered.
+  ASSERT_TRUE(IsSignalHandlerRegistered(SIGHUP));
+
+  // SIGUSR2 should be relinquished.
+  ASSERT_FALSE(IsSignalHandlerRegistered(SIGUSR2));
+
+  // Stack traces should work using the new handler.
+  ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "SleeperThread");
+
+  // Switch back to SIGUSR2 and ensure it changes back.
+  ASSERT_OK(SetStackTraceSignal(SIGUSR2));
+  ASSERT_TRUE(IsSignalHandlerRegistered(SIGUSR2));
+  ASSERT_FALSE(IsSignalHandlerRegistered(SIGHUP));
+
+  // Stack traces should work using the new handler.
+  ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "SleeperThread");
+
+  // Register our own signal handler on SIGHUP, and ensure that
+  // we get a bad Status if we try to use it.
+  signal(SIGHUP, &fake_signal_handler);
+  ASSERT_STR_CONTAINS(SetStackTraceSignal(SIGHUP).ToString(),
+                      "unable to install signal handler");
+  signal(SIGHUP, SIG_DFL);
+
+  // Stack traces should be disabled
+  ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "unable to take thread 
stack");
+
+  // Re-enable so that other tests pass.
+  ASSERT_OK(SetStackTraceSignal(SIGUSR2));
+}
+
+// Test which dumps all known threads within this process.
+// We don't validate the results in any way -- but this verifies that we can
+// dump library threads such as the libc timer_thread and properly time out.
+TEST_F(DebugUtilTest, TestDumpAllThreads) {
+  vector<pid_t> tids;
+  ASSERT_OK(ListThreads(&tids));
+  for (pid_t tid : tids) {
+    LOG(INFO) << DumpThreadStack(tid);
+  }
+}
+#endif
+
+} // namespace kudu

Reply via email to