http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/cache_metrics.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/cache_metrics.h b/be/src/kudu/util/cache_metrics.h new file mode 100644 index 0000000..47f759f --- /dev/null +++ b/be/src/kudu/util/cache_metrics.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_CACHE_METRICS_H +#define KUDU_UTIL_CACHE_METRICS_H + +#include <stdint.h> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" + +namespace kudu { + +template<class T> +class AtomicGauge; +class Counter; +class MetricEntity; + +struct CacheMetrics { + explicit CacheMetrics(const scoped_refptr<MetricEntity>& metric_entity); + + scoped_refptr<Counter> inserts; + scoped_refptr<Counter> lookups; + scoped_refptr<Counter> evictions; + scoped_refptr<Counter> cache_hits; + scoped_refptr<Counter> cache_hits_caching; + scoped_refptr<Counter> cache_misses; + scoped_refptr<Counter> cache_misses_caching; + + scoped_refptr<AtomicGauge<uint64_t> > cache_usage; +}; + +} // namespace kudu +#endif /* KUDU_UTIL_CACHE_METRICS_H */
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/callback_bind-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/callback_bind-test.cc b/be/src/kudu/util/callback_bind-test.cc new file mode 100644 index 0000000..6f75867 --- /dev/null +++ b/be/src/kudu/util/callback_bind-test.cc @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/gutil/bind.h" +#include "kudu/gutil/callback.h" +#include "kudu/gutil/macros.h" + +#include <gtest/gtest.h> + +namespace kudu { + +using std::string; + +static int Return5() { + return 5; +} + +TEST(CallbackBindTest, TestFreeFunction) { + Callback<int(void)> func_cb = Bind(&Return5); + ASSERT_EQ(5, func_cb.Run()); +} + +class Ref : public RefCountedThreadSafe<Ref> { + public: + int Foo() { return 3; } +}; + +// Simple class that helps with verifying ref counting. +// Not thread-safe. +struct RefCountable { + RefCountable() + : refs(0) { + } + void AddRef() const { + refs++; + } + void Release() const { + refs--; + } + void Print() const { + LOG(INFO) << "Hello. Refs: " << refs; + } + + mutable int refs; + DISALLOW_COPY_AND_ASSIGN(RefCountable); +}; + +TEST(CallbackBindTest, TestClassMethod) { + scoped_refptr<Ref> ref = new Ref(); + Callback<int(void)> ref_cb = Bind(&Ref::Foo, ref); + ref = nullptr; + ASSERT_EQ(3, ref_cb.Run()); +} + +int ReturnI(int i, const char* str) { + return i; +} + +TEST(CallbackBindTest, TestPartialBind) { + Callback<int(const char*)> cb = Bind(&ReturnI, 23); + ASSERT_EQ(23, cb.Run("hello world")); +} + +char IncrementChar(gscoped_ptr<char> in) { + return *in + 1; +} + +TEST(CallbackBindTest, TestCallScopedPtrArg) { + // Calling a function with a gscoped_ptr argument is just like any other + // function which takes gscoped_ptr: + gscoped_ptr<char> foo(new char('x')); + Callback<char(gscoped_ptr<char>)> cb = Bind(&IncrementChar); + ASSERT_EQ('y', cb.Run(std::move(foo))); +} + +TEST(CallbackBindTest, TestBindScopedPtrArg) { + // Binding a function with a gscoped_ptr argument requires using Passed() + gscoped_ptr<char> foo(new char('x')); + Callback<char(void)> cb = Bind(&IncrementChar, Passed(&foo)); + ASSERT_EQ('y', cb.Run()); +} + +// Test that the ref counting functionality works. +TEST(CallbackBindTest, TestRefCounting) { + RefCountable countable; + { + ASSERT_EQ(0, countable.refs); + Closure cb = Bind(&RefCountable::Print, &countable); + ASSERT_EQ(1, countable.refs); + cb.Run(); + ASSERT_EQ(1, countable.refs); + } + ASSERT_EQ(0, countable.refs); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/coding-inl.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/coding-inl.h b/be/src/kudu/util/coding-inl.h new file mode 100644 index 0000000..5fe0f9d --- /dev/null +++ b/be/src/kudu/util/coding-inl.h @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// Some portions Copyright (c) 2011 The LevelDB Authors. +// +// Endian-neutral encoding: +// * Fixed-length numbers are encoded with least-significant byte first +// * In addition we support variable length "varint" encoding +// * Strings are encoded prefixed by their length in varint format + +#ifndef KUDU_UTIL_CODING_INL_H +#define KUDU_UTIL_CODING_INL_H + +#include <stdint.h> +#include <string.h> + +namespace kudu { + +inline uint8_t *InlineEncodeVarint32(uint8_t *dst, uint32_t v) { + // Operate on characters as unsigneds + uint8_t *ptr = dst; + static const int B = 128; + if (v < (1<<7)) { + *(ptr++) = v; + } else if (v < (1<<14)) { + *(ptr++) = v | B; + *(ptr++) = v>>7; + } else if (v < (1<<21)) { + *(ptr++) = v | B; + *(ptr++) = (v>>7) | B; + *(ptr++) = v>>14; + } else if (v < (1<<28)) { + *(ptr++) = v | B; + *(ptr++) = (v>>7) | B; + *(ptr++) = (v>>14) | B; + *(ptr++) = v>>21; + } else { + *(ptr++) = v | B; + *(ptr++) = (v>>7) | B; + *(ptr++) = (v>>14) | B; + *(ptr++) = (v>>21) | B; + *(ptr++) = v>>28; + } + return ptr; +} + +inline void InlineEncodeFixed32(uint8_t *buf, uint32_t value) { +#if __BYTE_ORDER == __LITTLE_ENDIAN + memcpy(buf, &value, sizeof(value)); +#else + buf[0] = value & 0xff; + buf[1] = (value >> 8) & 0xff; + buf[2] = (value >> 16) & 0xff; + buf[3] = (value >> 24) & 0xff; +#endif +} + +inline void InlineEncodeFixed64(uint8_t *buf, uint64_t value) { +#if __BYTE_ORDER == __LITTLE_ENDIAN + memcpy(buf, &value, sizeof(value)); +#else + buf[0] = value & 0xff; + buf[1] = (value >> 8) & 0xff; + buf[2] = (value >> 16) & 0xff; + buf[3] = (value >> 24) & 0xff; + buf[4] = (value >> 32) & 0xff; + buf[5] = (value >> 40) & 0xff; + buf[6] = (value >> 48) & 0xff; + buf[7] = (value >> 56) & 0xff; +#endif +} + + +// Standard Put... routines append to a string +template <class StrType> +inline void InlinePutFixed32(StrType *dst, uint32_t value) { + uint8_t buf[sizeof(value)]; + InlineEncodeFixed32(buf, value); + dst->append(buf, sizeof(buf)); +} + +template <class StrType> +inline void InlinePutFixed64(StrType *dst, uint64_t value) { + uint8_t buf[sizeof(value)]; + InlineEncodeFixed64(buf, value); + dst->append(buf, sizeof(buf)); +} + +template <class StrType> +inline void InlinePutVarint32(StrType* dst, uint32_t v) { + // We resize the array and then size it back down as appropriate + // rather than using append(), since the generated code ends up + // being substantially shorter. + int old_size = dst->size(); + dst->resize(old_size + 5); + uint8_t* p = &(*dst)[old_size]; + uint8_t *ptr = InlineEncodeVarint32(p, v); + + dst->resize(old_size + ptr - p); +} + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/coding.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/coding.cc b/be/src/kudu/util/coding.cc new file mode 100644 index 0000000..bd3cfcd --- /dev/null +++ b/be/src/kudu/util/coding.cc @@ -0,0 +1,141 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "kudu/util/coding.h" +#include "kudu/util/coding-inl.h" + +namespace kudu { + +void PutVarint32(faststring* dst, uint32_t v) { + uint8_t buf[5]; + uint8_t* ptr = InlineEncodeVarint32(buf, v); + dst->append(buf, ptr - buf); +} + +uint8_t* EncodeVarint64(uint8_t* dst, uint64_t v) { + static const int B = 128; + while (v >= B) { + *(dst++) = (v & (B-1)) | B; + v >>= 7; + } + *(dst++) = static_cast<uint8_t>(v); + return dst; +} + +void PutFixed32(faststring *dst, uint32_t value) { + InlinePutFixed32(dst, value); +} + +void PutFixed64(faststring *dst, uint64_t value) { + InlinePutFixed64(dst, value); +} + +void PutVarint64(faststring *dst, uint64_t v) { + uint8_t buf[10]; + uint8_t* ptr = EncodeVarint64(buf, v); + dst->append(buf, ptr - buf); +} + +void PutLengthPrefixedSlice(faststring* dst, const Slice& value) { + PutVarint32(dst, value.size()); + dst->append(value.data(), value.size()); +} + +void PutFixed32LengthPrefixedSlice(faststring* dst, const Slice& value) { + PutFixed32(dst, value.size()); + dst->append(value.data(), value.size()); +} + +int VarintLength(uint64_t v) { + int len = 1; + while (v >= 128) { + v >>= 7; + len++; + } + return len; +} + +const uint8_t *GetVarint32PtrFallback(const uint8_t *p, + const uint8_t *limit, + uint32_t* value) { + uint32_t result = 0; + for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) { + uint32_t byte = *p; + p++; + if (byte & 128) { + // More bytes are present + result |= ((byte & 127) << shift); + } else { + result |= (byte << shift); + *value = result; + return p; + } + } + return nullptr; +} + +bool GetVarint32(Slice* input, uint32_t* value) { + const uint8_t *p = input->data(); + const uint8_t *limit = p + input->size(); + const uint8_t *q = GetVarint32Ptr(p, limit, value); + if (q == nullptr) { + return false; + } else { + *input = Slice(q, limit - q); + return true; + } +} + +const uint8_t *GetVarint64Ptr(const uint8_t *p, const uint8_t *limit, uint64_t* value) { + uint64_t result = 0; + for (uint32_t shift = 0; shift <= 63 && p < limit; shift += 7) { + uint64_t byte = *p; + p++; + if (byte & 128) { + // More bytes are present + result |= ((byte & 127) << shift); + } else { + result |= (byte << shift); + *value = result; + return p; + } + } + return nullptr; +} + +bool GetVarint64(Slice* input, uint64_t* value) { + const uint8_t *p = input->data(); + const uint8_t *limit = p + input->size(); + const uint8_t *q = GetVarint64Ptr(p, limit, value); + if (q == nullptr) { + return false; + } else { + *input = Slice(q, limit - q); + return true; + } +} + +const uint8_t *GetLengthPrefixedSlice(const uint8_t *p, const uint8_t *limit, + Slice* result) { + uint32_t len = 0; + p = GetVarint32Ptr(p, limit, &len); + if (p == nullptr) return nullptr; + if (p + len > limit) return nullptr; + *result = Slice(p, len); + return p + len; +} + +bool GetLengthPrefixedSlice(Slice* input, Slice* result) { + uint32_t len = 0; + if (GetVarint32(input, &len) && + input->size() >= len) { + *result = Slice(input->data(), len); + input->remove_prefix(len); + return true; + } else { + return false; + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/coding.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/coding.h b/be/src/kudu/util/coding.h new file mode 100644 index 0000000..698d92a --- /dev/null +++ b/be/src/kudu/util/coding.h @@ -0,0 +1,110 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Endian-neutral encoding: +// * Fixed-length numbers are encoded with least-significant byte first +// * In addition we support variable length "varint" encoding +// * Strings are encoded prefixed by their length in varint format + +#ifndef STORAGE_LEVELDB_UTIL_CODING_H_ +#define STORAGE_LEVELDB_UTIL_CODING_H_ + +#include <stdint.h> +#include <string.h> +#include <string> + +#include "kudu/util/slice.h" +#include "kudu/util/faststring.h" + +namespace kudu { +extern void PutFixed32(faststring* dst, uint32_t value); +extern void PutFixed64(faststring* dst, uint64_t value); +extern void PutVarint32(faststring* dst, uint32_t value); +extern void PutVarint64(faststring* dst, uint64_t value); + +// Put a length-prefixed Slice into the buffer. The length prefix +// is varint-encoded. +extern void PutLengthPrefixedSlice(faststring* dst, const Slice& value); + +// Put a length-prefixed Slice into the buffer. The length prefix +// is 32-bit fixed encoded in little endian. +extern void PutFixed32LengthPrefixedSlice(faststring* dst, const Slice& value); + +// Standard Get... routines parse a value from the beginning of a Slice +// and advance the slice past the parsed value. +extern bool GetVarint32(Slice* input, uint32_t* value); +extern bool GetVarint64(Slice* input, uint64_t* value); +extern bool GetLengthPrefixedSlice(Slice* input, Slice* result); + +// Pointer-based variants of GetVarint... These either store a value +// in *v and return a pointer just past the parsed value, or return +// NULL on error. These routines only look at bytes in the range +// [p..limit-1] +extern const uint8_t *GetVarint32Ptr(const uint8_t *p,const uint8_t *limit, uint32_t* v); +extern const uint8_t *GetVarint64Ptr(const uint8_t *p,const uint8_t *limit, uint64_t* v); + +// Returns the length of the varint32 or varint64 encoding of "v" +extern int VarintLength(uint64_t v); + +// Lower-level versions of Put... that write directly into a character buffer +// REQUIRES: dst has enough space for the value being written +extern void EncodeFixed32(uint8_t *dst, uint32_t value); +extern void EncodeFixed64(uint8_t *dst, uint64_t value); + +// Lower-level versions of Put... that write directly into a character buffer +// and return a pointer just past the last byte written. +// REQUIRES: dst has enough space for the value being written +extern uint8_t *EncodeVarint32(uint8_t *dst, uint32_t value); +extern uint8_t *EncodeVarint64(uint8_t *dst, uint64_t value); + +// Lower-level versions of Get... that read directly from a character buffer +// without any bounds checking. + +inline uint32_t DecodeFixed32(const uint8_t *ptr) { +#if __BYTE_ORDER == __LITTLE_ENDIAN + // Load the raw bytes + uint32_t result; + memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load + return result; +#else + return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0]))) + | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8) + | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16) + | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24)); +#endif +} + +inline uint64_t DecodeFixed64(const uint8_t *ptr) { +#if __BYTE_ORDER == __LITTLE_ENDIAN + // Load the raw bytes + uint64_t result; + memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load + return result; +#else + uint64_t lo = DecodeFixed32(ptr); + uint64_t hi = DecodeFixed32(ptr + 4); + return (hi << 32) | lo; +#endif +} + +// Internal routine for use by fallback path of GetVarint32Ptr +extern const uint8_t *GetVarint32PtrFallback(const uint8_t *p, + const uint8_t *limit, + uint32_t* value); +inline const uint8_t *GetVarint32Ptr(const uint8_t *p, + const uint8_t *limit, + uint32_t* value) { + if (PREDICT_TRUE(p < limit)) { + uint32_t result = *p; + if (PREDICT_TRUE((result & 128) == 0)) { + *value = result; + return p + 1; + } + } + return GetVarint32PtrFallback(p, limit, value); +} + +} // namespace kudu + +#endif // STORAGE_LEVELDB_UTIL_CODING_H_ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/compression/compression-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/compression/compression-test.cc b/be/src/kudu/util/compression/compression-test.cc new file mode 100644 index 0000000..1befbe5 --- /dev/null +++ b/be/src/kudu/util/compression/compression-test.cc @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <stdlib.h> + +#include <vector> + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/util/compression/compression_codec.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" +#include "kudu/util/status.h" + +namespace kudu { + +using std::vector; + +class TestCompression : public KuduTest {}; + +static void TestCompressionCodec(CompressionType compression) { + const int kInputSize = 64; + + const CompressionCodec* codec; + uint8_t ibuffer[kInputSize]; + uint8_t ubuffer[kInputSize]; + size_t compressed; + + // Fill the test input buffer + memset(ibuffer, 'Z', kInputSize); + + // Get the specified compression codec + ASSERT_OK(GetCompressionCodec(compression, &codec)); + + // Allocate the compression buffer + size_t max_compressed = codec->MaxCompressedLength(kInputSize); + ASSERT_LT(max_compressed, (kInputSize * 2)); + gscoped_array<uint8_t> cbuffer(new uint8_t[max_compressed]); + + // Compress and uncompress + ASSERT_OK(codec->Compress(Slice(ibuffer, kInputSize), cbuffer.get(), &compressed)); + ASSERT_OK(codec->Uncompress(Slice(cbuffer.get(), compressed), ubuffer, kInputSize)); + ASSERT_EQ(0, memcmp(ibuffer, ubuffer, kInputSize)); + + // Compress slices and uncompress + vector<Slice> v; + v.push_back(Slice(ibuffer, 1)); + for (int i = 1; i <= kInputSize; i += 7) + v.push_back(Slice(ibuffer + i, 7)); + ASSERT_OK(codec->Compress(Slice(ibuffer, kInputSize), cbuffer.get(), &compressed)); + ASSERT_OK(codec->Uncompress(Slice(cbuffer.get(), compressed), ubuffer, kInputSize)); + ASSERT_EQ(0, memcmp(ibuffer, ubuffer, kInputSize)); +} + +TEST_F(TestCompression, TestNoCompressionCodec) { + const CompressionCodec* codec; + ASSERT_OK(GetCompressionCodec(NO_COMPRESSION, &codec)); + ASSERT_EQ(nullptr, codec); +} + +TEST_F(TestCompression, TestSnappyCompressionCodec) { + TestCompressionCodec(SNAPPY); +} + +TEST_F(TestCompression, TestLz4CompressionCodec) { + TestCompressionCodec(LZ4); +} + +TEST_F(TestCompression, TestZlibCompressionCodec) { + TestCompressionCodec(ZLIB); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/compression/compression.proto ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/compression/compression.proto b/be/src/kudu/util/compression/compression.proto new file mode 100644 index 0000000..a0f5343 --- /dev/null +++ b/be/src/kudu/util/compression/compression.proto @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +syntax = "proto2"; +package kudu; + +option java_package = "org.apache.kudu"; + +enum CompressionType { + UNKNOWN_COMPRESSION = 999; + DEFAULT_COMPRESSION = 0; + NO_COMPRESSION = 1; + SNAPPY = 2; + LZ4 = 3; + ZLIB = 4; +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/compression/compression_codec.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/compression/compression_codec.cc b/be/src/kudu/util/compression/compression_codec.cc new file mode 100644 index 0000000..ee774cd --- /dev/null +++ b/be/src/kudu/util/compression/compression_codec.cc @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/compression/compression_codec.h" + +#include <string> +#include <vector> + +#include <glog/logging.h> +#include <lz4.h> +#include <snappy-sinksource.h> +#include <snappy.h> +#include <zlib.h> + + +#include "kudu/gutil/singleton.h" +#include "kudu/gutil/stringprintf.h" +#include "kudu/util/logging.h" +#include "kudu/util/string_case.h" + +namespace kudu { + +using std::vector; + +CompressionCodec::CompressionCodec() { +} +CompressionCodec::~CompressionCodec() { +} + +class SlicesSource : public snappy::Source { + public: + explicit SlicesSource(const std::vector<Slice>& slices) + : slice_index_(0), + slice_offset_(0), + slices_(slices) { + available_ = TotalSize(); + } + + size_t Available() const OVERRIDE { + return available_; + } + + const char* Peek(size_t* len) OVERRIDE { + if (available_ == 0) { + *len = 0; + return nullptr; + } + + const Slice& data = slices_[slice_index_]; + *len = data.size() - slice_offset_; + return reinterpret_cast<const char *>(data.data()) + slice_offset_; + } + + void Skip(size_t n) OVERRIDE { + DCHECK_LE(n, Available()); + if (n == 0) return; + + available_ -= n; + if ((n + slice_offset_) < slices_[slice_index_].size()) { + slice_offset_ += n; + } else { + n -= slices_[slice_index_].size() - slice_offset_; + slice_index_++; + while (n > 0 && n >= slices_[slice_index_].size()) { + n -= slices_[slice_index_].size(); + slice_index_++; + } + slice_offset_ = n; + } + } + + void Dump(faststring *buffer) { + buffer->reserve(buffer->size() + TotalSize()); + for (const Slice& block : slices_) { + buffer->append(block.data(), block.size()); + } + } + + private: + size_t TotalSize(void) const { + size_t size = 0; + for (const Slice& data : slices_) { + size += data.size(); + } + return size; + } + + private: + size_t available_; + size_t slice_index_; + size_t slice_offset_; + const vector<Slice>& slices_; +}; + +class SnappyCodec : public CompressionCodec { + public: + static SnappyCodec *GetSingleton() { + return Singleton<SnappyCodec>::get(); + } + + Status Compress(const Slice& input, + uint8_t *compressed, size_t *compressed_length) const OVERRIDE { + snappy::RawCompress(reinterpret_cast<const char *>(input.data()), input.size(), + reinterpret_cast<char *>(compressed), compressed_length); + return Status::OK(); + } + + Status Compress(const vector<Slice>& input_slices, + uint8_t *compressed, size_t *compressed_length) const OVERRIDE { + SlicesSource source(input_slices); + snappy::UncheckedByteArraySink sink(reinterpret_cast<char *>(compressed)); + if ((*compressed_length = snappy::Compress(&source, &sink)) <= 0) { + return Status::Corruption("unable to compress the buffer"); + } + return Status::OK(); + } + + Status Uncompress(const Slice& compressed, + uint8_t *uncompressed, + size_t uncompressed_length) const OVERRIDE { + bool success = snappy::RawUncompress(reinterpret_cast<const char *>(compressed.data()), + compressed.size(), reinterpret_cast<char *>(uncompressed)); + return success ? Status::OK() : Status::Corruption("unable to uncompress the buffer"); + } + + size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE { + return snappy::MaxCompressedLength(source_bytes); + } + + CompressionType type() const override { + return SNAPPY; + } +}; + +class Lz4Codec : public CompressionCodec { + public: + static Lz4Codec *GetSingleton() { + return Singleton<Lz4Codec>::get(); + } + + Status Compress(const Slice& input, + uint8_t *compressed, size_t *compressed_length) const OVERRIDE { + int n = LZ4_compress(reinterpret_cast<const char *>(input.data()), + reinterpret_cast<char *>(compressed), input.size()); + *compressed_length = n; + return Status::OK(); + } + + Status Compress(const vector<Slice>& input_slices, + uint8_t *compressed, size_t *compressed_length) const OVERRIDE { + if (input_slices.size() == 1) { + return Compress(input_slices[0], compressed, compressed_length); + } + + SlicesSource source(input_slices); + faststring buffer; + source.Dump(&buffer); + return Compress(Slice(buffer.data(), buffer.size()), compressed, compressed_length); + } + + Status Uncompress(const Slice& compressed, + uint8_t *uncompressed, + size_t uncompressed_length) const OVERRIDE { + int n = LZ4_decompress_fast(reinterpret_cast<const char *>(compressed.data()), + reinterpret_cast<char *>(uncompressed), uncompressed_length); + if (n != compressed.size()) { + return Status::Corruption( + StringPrintf("unable to uncompress the buffer. error near %d, buffer", -n), + KUDU_REDACT(compressed.ToDebugString(100))); + } + return Status::OK(); + } + + size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE { + return LZ4_compressBound(source_bytes); + } + + CompressionType type() const override { + return LZ4; + } +}; + +/** + * TODO: use a instance-local Arena and pass alloc/free into zlib + * so that it allocates from the arena. + */ +class ZlibCodec : public CompressionCodec { + public: + static ZlibCodec *GetSingleton() { + return Singleton<ZlibCodec>::get(); + } + + Status Compress(const Slice& input, + uint8_t *compressed, size_t *compressed_length) const OVERRIDE { + *compressed_length = MaxCompressedLength(input.size()); + int err = ::compress(compressed, compressed_length, input.data(), input.size()); + return err == Z_OK ? Status::OK() : Status::IOError("unable to compress the buffer"); + } + + Status Compress(const vector<Slice>& input_slices, + uint8_t *compressed, size_t *compressed_length) const OVERRIDE { + if (input_slices.size() == 1) { + return Compress(input_slices[0], compressed, compressed_length); + } + + // TODO: use z_stream + SlicesSource source(input_slices); + faststring buffer; + source.Dump(&buffer); + return Compress(Slice(buffer.data(), buffer.size()), compressed, compressed_length); + } + + Status Uncompress(const Slice& compressed, + uint8_t *uncompressed, size_t uncompressed_length) const OVERRIDE { + int err = ::uncompress(uncompressed, &uncompressed_length, + compressed.data(), compressed.size()); + return err == Z_OK ? Status::OK() : Status::Corruption("unable to uncompress the buffer"); + } + + size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE { + // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block + return source_bytes + (6 + (5 * ((source_bytes + 16383) >> 14))); + } + + CompressionType type() const override { + return ZLIB; + } +}; + +Status GetCompressionCodec(CompressionType compression, + const CompressionCodec** codec) { + switch (compression) { + case NO_COMPRESSION: + *codec = nullptr; + break; + case SNAPPY: + *codec = SnappyCodec::GetSingleton(); + break; + case LZ4: + *codec = Lz4Codec::GetSingleton(); + break; + case ZLIB: + *codec = ZlibCodec::GetSingleton(); + break; + default: + return Status::NotFound("bad compression type"); + } + return Status::OK(); +} + +CompressionType GetCompressionCodecType(const std::string& name) { + string uname; + ToUpperCase(name, &uname); + + if (uname.compare("SNAPPY") == 0) + return SNAPPY; + if (uname.compare("LZ4") == 0) + return LZ4; + if (uname.compare("ZLIB") == 0) + return ZLIB; + if (uname.compare("NONE") == 0) + return NO_COMPRESSION; + + LOG(WARNING) << "Unable to recognize the compression codec '" << name + << "' using no compression as default."; + return NO_COMPRESSION; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/compression/compression_codec.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/compression/compression_codec.h b/be/src/kudu/util/compression/compression_codec.h new file mode 100644 index 0000000..538af15 --- /dev/null +++ b/be/src/kudu/util/compression/compression_codec.h @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_CFILE_COMPRESSION_CODEC_H +#define KUDU_CFILE_COMPRESSION_CODEC_H + +#include <string> +#include <vector> + +#include "kudu/util/compression/compression.pb.h" +#include "kudu/gutil/macros.h" +#include "kudu/util/slice.h" +#include "kudu/util/status.h" + +namespace kudu { + +class CompressionCodec { + public: + CompressionCodec(); + virtual ~CompressionCodec(); + + // REQUIRES: "compressed" must point to an area of memory that is at + // least "MaxCompressedLength(input_length)" bytes in length. + // + // Takes the data stored in "input[0..input_length]" and stores + // it in the array pointed to by "compressed". + // + // returns the length of the compressed output. + virtual Status Compress(const Slice& input, + uint8_t *compressed, size_t *compressed_length) const = 0; + + virtual Status Compress(const std::vector<Slice>& input_slices, + uint8_t *compressed, size_t *compressed_length) const = 0; + + // Given data in "compressed[0..compressed_length-1]" generated by + // calling the Compress routine, this routine stores the uncompressed data + // to uncompressed[0..uncompressed_length-1] + // returns false if the message is corrupted and could not be uncompressed + virtual Status Uncompress(const Slice& compressed, + uint8_t *uncompressed, size_t uncompressed_length) const = 0; + + // Returns the maximal size of the compressed representation of + // input data that is "source_bytes" bytes in length. + virtual size_t MaxCompressedLength(size_t source_bytes) const = 0; + + // Return the type of compression implemented by this codec. + virtual CompressionType type() const = 0; + private: + DISALLOW_COPY_AND_ASSIGN(CompressionCodec); +}; + +// Returns the compression codec for the specified type. +// +// The returned codec is a singleton and should be not be destroyed. +Status GetCompressionCodec(CompressionType compression, + const CompressionCodec** codec); + +// Returns the compression codec type given the name +CompressionType GetCompressionCodecType(const std::string& name); + +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/condition_variable.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/condition_variable.cc b/be/src/kudu/util/condition_variable.cc new file mode 100644 index 0000000..13d1d36 --- /dev/null +++ b/be/src/kudu/util/condition_variable.cc @@ -0,0 +1,140 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "kudu/util/condition_variable.h" + +#include <glog/logging.h> + +#include <errno.h> +#include <sys/time.h> + +#include "kudu/util/monotime.h" +#include "kudu/util/thread_restrictions.h" + +namespace kudu { + +ConditionVariable::ConditionVariable(Mutex* user_lock) + : user_mutex_(&user_lock->native_handle_) +#if !defined(NDEBUG) + , user_lock_(user_lock) +#endif +{ + int rv = 0; + // http://crbug.com/293736 + // NaCl doesn't support monotonic clock based absolute deadlines. + // On older Android platform versions, it's supported through the + // non-standard pthread_cond_timedwait_monotonic_np. Newer platform + // versions have pthread_condattr_setclock. + // Mac can use relative time deadlines. +#if !defined(__APPLE__) && !defined(OS_NACL) && \ + !(defined(OS_ANDROID) && defined(HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC)) + pthread_condattr_t attrs; + rv = pthread_condattr_init(&attrs); + DCHECK_EQ(0, rv); + pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC); + rv = pthread_cond_init(&condition_, &attrs); + pthread_condattr_destroy(&attrs); +#else + rv = pthread_cond_init(&condition_, nullptr); +#endif + DCHECK_EQ(0, rv); +} + +ConditionVariable::~ConditionVariable() { +#if defined(OS_MACOSX) + // This hack is necessary to avoid a fatal pthreads subsystem bug in the + // Darwin kernel. https://codereview.chromium.org/1323293005/ + { + Mutex lock; + MutexLock l(lock); + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 1; + pthread_cond_timedwait_relative_np(&condition_, lock.native_handle, &ts); + } +#endif + int rv = pthread_cond_destroy(&condition_); + DCHECK_EQ(0, rv); +} + +void ConditionVariable::Wait() const { + ThreadRestrictions::AssertWaitAllowed(); +#if !defined(NDEBUG) + user_lock_->CheckHeldAndUnmark(); +#endif + int rv = pthread_cond_wait(&condition_, user_mutex_); + DCHECK_EQ(0, rv); +#if !defined(NDEBUG) + user_lock_->CheckUnheldAndMark(); +#endif +} + +bool ConditionVariable::TimedWait(const MonoDelta& max_time) const { + ThreadRestrictions::AssertWaitAllowed(); + + // Negative delta means we've already timed out. + int64 nsecs = max_time.ToNanoseconds(); + if (nsecs < 0) { + return false; + } + + struct timespec relative_time; + max_time.ToTimeSpec(&relative_time); + +#if !defined(NDEBUG) + user_lock_->CheckHeldAndUnmark(); +#endif + +#if defined(__APPLE__) + int rv = pthread_cond_timedwait_relative_np( + &condition_, user_mutex_, &relative_time); +#else + // The timeout argument to pthread_cond_timedwait is in absolute time. + struct timespec absolute_time; +#if defined(OS_NACL) + // See comment in constructor for why this is different in NaCl. + struct timeval now; + gettimeofday(&now, NULL); + absolute_time.tv_sec = now.tv_sec; + absolute_time.tv_nsec = now.tv_usec * MonoTime::kNanosecondsPerMicrosecond; +#else + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + absolute_time.tv_sec = now.tv_sec; + absolute_time.tv_nsec = now.tv_nsec; +#endif + + absolute_time.tv_sec += relative_time.tv_sec; + absolute_time.tv_nsec += relative_time.tv_nsec; + absolute_time.tv_sec += absolute_time.tv_nsec / MonoTime::kNanosecondsPerSecond; + absolute_time.tv_nsec %= MonoTime::kNanosecondsPerSecond; + DCHECK_GE(absolute_time.tv_sec, now.tv_sec); // Overflow paranoia + +#if defined(OS_ANDROID) && defined(HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC) + int rv = pthread_cond_timedwait_monotonic_np( + &condition_, user_mutex_, &absolute_time); +#else + int rv = pthread_cond_timedwait(&condition_, user_mutex_, &absolute_time); +#endif // OS_ANDROID && HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC +#endif // __APPLE__ + + DCHECK(rv == 0 || rv == ETIMEDOUT) + << "unexpected pthread_cond_timedwait return value: " << rv; +#if !defined(NDEBUG) + user_lock_->CheckUnheldAndMark(); +#endif + return rv == 0; +} + +void ConditionVariable::Broadcast() { + int rv = pthread_cond_broadcast(&condition_); + DCHECK_EQ(0, rv); +} + +void ConditionVariable::Signal() { + int rv = pthread_cond_signal(&condition_); + DCHECK_EQ(0, rv); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/condition_variable.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/condition_variable.h b/be/src/kudu/util/condition_variable.h new file mode 100644 index 0000000..ca6e265 --- /dev/null +++ b/be/src/kudu/util/condition_variable.h @@ -0,0 +1,113 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// ConditionVariable wraps pthreads condition variable synchronization or, on +// Windows, simulates it. This functionality is very helpful for having +// several threads wait for an event, as is common with a thread pool managed +// by a master. The meaning of such an event in the (worker) thread pool +// scenario is that additional tasks are now available for processing. It is +// used in Chrome in the DNS prefetching system to notify worker threads that +// a queue now has items (tasks) which need to be tended to. A related use +// would have a pool manager waiting on a ConditionVariable, waiting for a +// thread in the pool to announce (signal) that there is now more room in a +// (bounded size) communications queue for the manager to deposit tasks, or, +// as a second example, that the queue of tasks is completely empty and all +// workers are waiting. +// +// USAGE NOTE 1: spurious signal events are possible with this and +// most implementations of condition variables. As a result, be +// *sure* to retest your condition before proceeding. The following +// is a good example of doing this correctly: +// +// while (!work_to_be_done()) Wait(...); +// +// In contrast do NOT do the following: +// +// if (!work_to_be_done()) Wait(...); // Don't do this. +// +// Especially avoid the above if you are relying on some other thread only +// issuing a signal up *if* there is work-to-do. There can/will +// be spurious signals. Recheck state on waiting thread before +// assuming the signal was intentional. Caveat caller ;-). +// +// USAGE NOTE 2: Broadcast() frees up all waiting threads at once, +// which leads to contention for the locks they all held when they +// called Wait(). This results in POOR performance. A much better +// approach to getting a lot of threads out of Wait() is to have each +// thread (upon exiting Wait()) call Signal() to free up another +// Wait'ing thread. Look at condition_variable_unittest.cc for +// both examples. +// +// Broadcast() can be used nicely during teardown, as it gets the job +// done, and leaves no sleeping threads... and performance is less +// critical at that point. +// +// The semantics of Broadcast() are carefully crafted so that *all* +// threads that were waiting when the request was made will indeed +// get signaled. Some implementations mess up, and don't signal them +// all, while others allow the wait to be effectively turned off (for +// a while while waiting threads come around). This implementation +// appears correct, as it will not "lose" any signals, and will guarantee +// that all threads get signaled by Broadcast(). +// +// This implementation offers support for "performance" in its selection of +// which thread to revive. Performance, in direct contrast with "fairness," +// assures that the thread that most recently began to Wait() is selected by +// Signal to revive. Fairness would (if publicly supported) assure that the +// thread that has Wait()ed the longest is selected. The default policy +// may improve performance, as the selected thread may have a greater chance of +// having some of its stack data in various CPU caches. +// +// For a discussion of the many very subtle implementation details, see the FAQ +// at the end of condition_variable_win.cc. + +#ifndef BASE_SYNCHRONIZATION_CONDITION_VARIABLE_H_ +#define BASE_SYNCHRONIZATION_CONDITION_VARIABLE_H_ + +#include <pthread.h> + +#include "kudu/util/monotime.h" +#include "kudu/util/mutex.h" + +namespace kudu { + +class ConditionVarImpl; +class TimeDelta; + +class ConditionVariable { + public: + // Construct a cv for use with ONLY one user lock. + explicit ConditionVariable(Mutex* user_lock); + + ~ConditionVariable(); + + // Wait() releases the caller's critical section atomically as it starts to + // sleep, and the reacquires it when it is signaled. + void Wait() const; + + // Like Wait(), but only waits up to a limited amount of time. + // + // Returns true if we were Signal()'ed, or false if 'max_time' elapsed. + bool TimedWait(const MonoDelta& max_time) const; + + // Broadcast() revives all waiting threads. + void Broadcast(); + // Signal() revives one waiting thread. + void Signal(); + + private: + + mutable pthread_cond_t condition_; + pthread_mutex_t* user_mutex_; + +#if !defined(NDEBUG) + Mutex* user_lock_; // Needed to adjust shadow lock state on wait. +#endif + + DISALLOW_COPY_AND_ASSIGN(ConditionVariable); +}; + +} // namespace kudu + +#endif // BASE_SYNCHRONIZATION_CONDITION_VARIABLE_H_ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/countdown_latch-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/countdown_latch-test.cc b/be/src/kudu/util/countdown_latch-test.cc new file mode 100644 index 0000000..cf2517c --- /dev/null +++ b/be/src/kudu/util/countdown_latch-test.cc @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <boost/bind.hpp> +#include <gtest/gtest.h> + +#include "kudu/util/countdown_latch.h" +#include "kudu/util/test_util.h" +#include "kudu/util/thread.h" +#include "kudu/util/threadpool.h" + +namespace kudu { + +static void DecrementLatch(CountDownLatch* latch, int amount) { + if (amount == 1) { + latch->CountDown(); + return; + } + latch->CountDown(amount); +} + +// Tests that we can decrement the latch by arbitrary amounts, as well +// as 1 by one. +TEST(TestCountDownLatch, TestLatch) { + + gscoped_ptr<ThreadPool> pool; + ASSERT_OK(ThreadPoolBuilder("cdl-test").set_max_threads(1).Build(&pool)); + + CountDownLatch latch(1000); + + // Decrement the count by 1 in another thread, this should not fire the + // latch. + ASSERT_OK(pool->SubmitFunc(boost::bind(DecrementLatch, &latch, 1))); + ASSERT_FALSE(latch.WaitFor(MonoDelta::FromMilliseconds(200))); + ASSERT_EQ(999, latch.count()); + + // Now decrement by 1000 this should decrement to 0 and fire the latch + // (even though 1000 is one more than the current count). + ASSERT_OK(pool->SubmitFunc(boost::bind(DecrementLatch, &latch, 1000))); + latch.Wait(); + ASSERT_EQ(0, latch.count()); +} + +// Test that resetting to zero while there are waiters lets the waiters +// continue. +TEST(TestCountDownLatch, TestResetToZero) { + CountDownLatch cdl(100); + scoped_refptr<Thread> t; + ASSERT_OK(Thread::Create("test", "cdl-test", &CountDownLatch::Wait, &cdl, &t)); + + // Sleep for a bit until it's likely the other thread is waiting on the latch. + SleepFor(MonoDelta::FromMilliseconds(10)); + cdl.Reset(0); + t->Join(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/countdown_latch.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/countdown_latch.h b/be/src/kudu/util/countdown_latch.h new file mode 100644 index 0000000..7024c1c --- /dev/null +++ b/be/src/kudu/util/countdown_latch.h @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_COUNTDOWN_LATCH_H +#define KUDU_UTIL_COUNTDOWN_LATCH_H + +#include "kudu/gutil/macros.h" +#include "kudu/util/condition_variable.h" +#include "kudu/util/monotime.h" +#include "kudu/util/mutex.h" +#include "kudu/util/thread_restrictions.h" + +namespace kudu { + +// This is a C++ implementation of the Java CountDownLatch +// class. +// See http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html +class CountDownLatch { + public: + // Initialize the latch with the given initial count. + explicit CountDownLatch(int count) + : cond_(&lock_), + count_(count) { + } + + // Decrement the count of this latch by 'amount' + // If the new count is less than or equal to zero, then all waiting threads are woken up. + // If the count is already zero, this has no effect. + void CountDown(int amount) { + DCHECK_GE(amount, 0); + MutexLock lock(lock_); + if (count_ == 0) { + return; + } + + if (amount >= count_) { + count_ = 0; + } else { + count_ -= amount; + } + + if (count_ == 0) { + // Latch has triggered. + cond_.Broadcast(); + } + } + + // Decrement the count of this latch. + // If the new count is zero, then all waiting threads are woken up. + // If the count is already zero, this has no effect. + void CountDown() { + CountDown(1); + } + + // Wait until the count on the latch reaches zero. + // If the count is already zero, this returns immediately. + void Wait() const { + ThreadRestrictions::AssertWaitAllowed(); + MutexLock lock(lock_); + while (count_ > 0) { + cond_.Wait(); + } + } + + // Waits for the count on the latch to reach zero, or until 'until' time is reached. + // Returns true if the count became zero, false otherwise. + bool WaitUntil(const MonoTime& when) const { + ThreadRestrictions::AssertWaitAllowed(); + return WaitFor(when - MonoTime::Now()); + } + + // Waits for the count on the latch to reach zero, or until 'delta' time elapses. + // Returns true if the count became zero, false otherwise. + bool WaitFor(const MonoDelta& delta) const { + ThreadRestrictions::AssertWaitAllowed(); + MutexLock lock(lock_); + while (count_ > 0) { + if (!cond_.TimedWait(delta)) { + return false; + } + } + return true; + } + + // Reset the latch with the given count. This is equivalent to reconstructing + // the latch. If 'count' is 0, and there are currently waiters, those waiters + // will be triggered as if you counted down to 0. + void Reset(uint64_t count) { + MutexLock lock(lock_); + count_ = count; + if (count_ == 0) { + // Awake any waiters if we reset to 0. + cond_.Broadcast(); + } + } + + uint64_t count() const { + MutexLock lock(lock_); + return count_; + } + + private: + DISALLOW_COPY_AND_ASSIGN(CountDownLatch); + mutable Mutex lock_; + ConditionVariable cond_; + + uint64_t count_; +}; + +// Utility class which calls latch->CountDown() in its destructor. +class CountDownOnScopeExit { + public: + explicit CountDownOnScopeExit(CountDownLatch *latch) : latch_(latch) {} + ~CountDownOnScopeExit() { + latch_->CountDown(); + } + + private: + DISALLOW_COPY_AND_ASSIGN(CountDownOnScopeExit); + + CountDownLatch *latch_; +}; + +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/cow_object.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/cow_object.h b/be/src/kudu/util/cow_object.h new file mode 100644 index 0000000..10c019e --- /dev/null +++ b/be/src/kudu/util/cow_object.h @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_COW_OBJECT_H +#define KUDU_UTIL_COW_OBJECT_H + +#include <glog/logging.h> +#include <algorithm> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/macros.h" +#include "kudu/util/rwc_lock.h" + +namespace kudu { + +// An object which manages its state via copy-on-write. +// +// Access to this object can be done more conveniently using the +// CowLock template class defined below. +// +// The 'State' template parameter must be swappable using std::swap. +template<class State> +class CowObject { + public: + CowObject() {} + ~CowObject() {} + + void ReadLock() const { + lock_.ReadLock(); + } + + void ReadUnlock() const { + lock_.ReadUnlock(); + } + + // Lock the object for write (preventing concurrent mutators), and make a safe + // copy of the object to mutate. + void StartMutation() { + lock_.WriteLock(); + // Clone our object. + dirty_state_.reset(new State(state_)); + } + + // Abort the current mutation. This drops the write lock without applying any + // changes made to the mutable copy. + void AbortMutation() { + dirty_state_.reset(); + lock_.WriteUnlock(); + } + + // Commit the current mutation. This escalates to the "Commit" lock, which + // blocks any concurrent readers or writers, swaps in the new version of the + // State, and then drops the commit lock. + void CommitMutation() { + lock_.UpgradeToCommitLock(); + CHECK(dirty_state_); + std::swap(state_, *dirty_state_); + dirty_state_.reset(); + lock_.CommitUnlock(); + } + + // Return the current state, not reflecting any in-progress mutations. + State& state() { + DCHECK(lock_.HasReaders() || lock_.HasWriteLock()); + return state_; + } + + const State& state() const { + DCHECK(lock_.HasReaders() || lock_.HasWriteLock()); + return state_; + } + + // Returns the current dirty state (i.e reflecting in-progress mutations). + // Should only be called by a thread who previously called StartMutation(). + State* mutable_dirty() { + DCHECK(lock_.HasWriteLock()); + return DCHECK_NOTNULL(dirty_state_.get()); + } + + const State& dirty() const { + return *DCHECK_NOTNULL(dirty_state_.get()); + } + + private: + mutable RWCLock lock_; + + State state_; + gscoped_ptr<State> dirty_state_; + + DISALLOW_COPY_AND_ASSIGN(CowObject); +}; + +// A lock-guard-like scoped object to acquire the lock on a CowObject, +// and obtain a pointer to the correct copy to read/write. +// +// Example usage: +// +// CowObject<Foo> my_obj; +// { +// CowLock<Foo> l(&my_obj, CowLock<Foo>::READ); +// l.data().get_foo(); +// ... +// } +// { +// CowLock<Foo> l(&my_obj, CowLock<Foo>::WRITE); +// l->mutable_data()->set_foo(...); +// ... +// l.Commit(); +// } +template<class State> +class CowLock { + public: + enum LockMode { + READ, WRITE, RELEASED + }; + + // Lock in either read or write mode. + CowLock(CowObject<State>* cow, + LockMode mode) + : cow_(cow), + mode_(mode) { + if (mode == READ) { + cow_->ReadLock(); + } else if (mode_ == WRITE) { + cow_->StartMutation(); + } else { + LOG(FATAL) << "Cannot lock in mode " << mode; + } + } + + // Lock in read mode. + // A const object may not be locked in write mode. + CowLock(const CowObject<State>* info, + LockMode mode) + : cow_(const_cast<CowObject<State>*>(info)), + mode_(mode) { + if (mode == READ) { + cow_->ReadLock(); + } else if (mode_ == WRITE) { + LOG(FATAL) << "Cannot write-lock a const pointer"; + } else { + LOG(FATAL) << "Cannot lock in mode " << mode; + } + } + + // Commit the underlying object. + // Requires that the caller hold the lock in write mode. + void Commit() { + DCHECK_EQ(WRITE, mode_); + cow_->CommitMutation(); + mode_ = RELEASED; + } + + void Unlock() { + if (mode_ == READ) { + cow_->ReadUnlock(); + } else if (mode_ == WRITE) { + cow_->AbortMutation(); + } else { + DCHECK_EQ(RELEASED, mode_); + } + mode_ = RELEASED; + } + + // Obtain the underlying data. In WRITE mode, this returns the + // same data as mutable_data() (not the safe unchanging copy). + const State& data() const { + if (mode_ == READ) { + return cow_->state(); + } else if (mode_ == WRITE) { + return cow_->dirty(); + } else { + LOG(FATAL) << "Cannot access data after committing"; + } + } + + // Obtain the mutable data. This may only be called in WRITE mode. + State* mutable_data() { + if (mode_ == READ) { + LOG(FATAL) << "Cannot mutate data with READ lock"; + } else if (mode_ == WRITE) { + return cow_->mutable_dirty(); + } else { + LOG(FATAL) << "Cannot access data after committing"; + } + } + + bool is_write_locked() const { + return mode_ == WRITE; + } + + // Drop the lock. If the lock is held in WRITE mode, and the + // lock has not yet been released, aborts the mutation, restoring + // the underlying object to its original data. + ~CowLock() { + Unlock(); + } + + private: + CowObject<State>* cow_; + LockMode mode_; + DISALLOW_COPY_AND_ASSIGN(CowLock); +}; + +} // namespace kudu +#endif /* KUDU_UTIL_COW_OBJECT_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/crc-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/crc-test.cc b/be/src/kudu/util/crc-test.cc new file mode 100644 index 0000000..2c6db4b --- /dev/null +++ b/be/src/kudu/util/crc-test.cc @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/crc.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/test_util.h" + +namespace kudu { +namespace crc { + +using strings::Substitute; + +class CrcTest : public KuduTest { + protected: + + // Returns pointer to data which must be deleted by caller. + static void GenerateBenchmarkData(const uint8_t** bufptr, size_t* buflen) { + const uint32_t kNumNumbers = 1000000; + const uint32_t kBytesPerNumber = sizeof(uint32_t); + const uint32_t kLength = kNumNumbers * kBytesPerNumber; + auto buf = new uint8_t[kLength]; + for (uint32_t i = 0; i < kNumNumbers; i++) { + memcpy(buf + (i * kBytesPerNumber), &i, kBytesPerNumber); + } + *bufptr = buf; + *buflen = kLength; + } + +}; + +// Basic functionality test. +TEST_F(CrcTest, TestCRC32C) { + const string test_data("abcdefgh"); + const uint64_t kExpectedCrc = 0xa9421b7; // Known value from crcutil usage test program. + + Crc* crc32c = GetCrc32cInstance(); + uint64_t data_crc = 0; + crc32c->Compute(test_data.data(), test_data.length(), &data_crc); + char buf[kFastToBufferSize]; + const char* output = FastHex64ToBuffer(data_crc, buf); + LOG(INFO) << "CRC32C of " << test_data << " is: 0x" << output << " (full 64 bits)"; + output = FastHex32ToBuffer(static_cast<uint32_t>(data_crc), buf); + LOG(INFO) << "CRC32C of " << test_data << " is: 0x" << output << " (truncated 32 bits)"; + ASSERT_EQ(kExpectedCrc, data_crc); + + // Using helper + uint64_t data_crc2 = Crc32c(test_data.data(), test_data.length()); + ASSERT_EQ(kExpectedCrc, data_crc2); + + // Using multiple chunks + size_t half_length = test_data.length() / 2; + uint64_t data_crc3 = Crc32c(test_data.data(), half_length); + data_crc3 = Crc32c(test_data.data() + half_length, half_length, data_crc3); + ASSERT_EQ(kExpectedCrc, data_crc3); +} + +// Simple benchmark of CRC32C throughput. +// We should expect about 8 bytes per cycle in throughput on a single core. +TEST_F(CrcTest, BenchmarkCRC32C) { + gscoped_ptr<const uint8_t[]> data; + const uint8_t* buf; + size_t buflen; + GenerateBenchmarkData(&buf, &buflen); + data.reset(buf); + Crc* crc32c = GetCrc32cInstance(); + int kNumRuns = 1000; + if (AllowSlowTests()) { + kNumRuns = 40000; + } + const uint64_t kNumBytes = kNumRuns * buflen; + Stopwatch sw; + sw.start(); + for (int i = 0; i < kNumRuns; i++) { + uint64_t cksum; + crc32c->Compute(buf, buflen, &cksum); + } + sw.stop(); + CpuTimes elapsed = sw.elapsed(); + LOG(INFO) << Substitute("$0 runs of CRC32C on $1 bytes of data (total: $2 bytes)" + " in $3 seconds; $4 bytes per millisecond, $5 bytes per nanosecond!", + kNumRuns, buflen, kNumBytes, elapsed.wall_seconds(), + (kNumBytes / elapsed.wall_millis()), + (kNumBytes / elapsed.wall)); +} + +} // namespace crc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/crc.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/crc.cc b/be/src/kudu/util/crc.cc new file mode 100644 index 0000000..1534b8d --- /dev/null +++ b/be/src/kudu/util/crc.cc @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "kudu/util/crc.h" + +#include <crcutil/interface.h> + +#include "kudu/gutil/once.h" +#include "kudu/util/debug/leakcheck_disabler.h" + +namespace kudu { +namespace crc { + +using debug::ScopedLeakCheckDisabler; + +static GoogleOnceType crc32c_once = GOOGLE_ONCE_INIT; +static Crc* crc32c_instance = nullptr; + +static void InitCrc32cInstance() { + ScopedLeakCheckDisabler disabler; // CRC instance is never freed. + // TODO: Is initial = 0 and roll window = 4 appropriate for all cases? + crc32c_instance = crcutil_interface::CRC::CreateCrc32c(true, 0, 4, nullptr); +} + +Crc* GetCrc32cInstance() { + GoogleOnceInit(&crc32c_once, &InitCrc32cInstance); + return crc32c_instance; +} + +uint32_t Crc32c(const void* data, size_t length) { + uint64_t crc32 = 0; + GetCrc32cInstance()->Compute(data, length, &crc32); + return static_cast<uint32_t>(crc32); // Only uses lower 32 bits. +} + +uint32_t Crc32c(const void* data, size_t length, uint32_t prev_crc32) { + uint64_t crc_tmp = static_cast<uint64_t>(prev_crc32); + GetCrc32cInstance()->Compute(data, length, &crc_tmp); + return static_cast<uint32_t>(crc_tmp); // Only uses lower 32 bits. +} + +} // namespace crc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/crc.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/crc.h b/be/src/kudu/util/crc.h new file mode 100644 index 0000000..a5db4ea --- /dev/null +++ b/be/src/kudu/util/crc.h @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_CRC_H_ +#define KUDU_UTIL_CRC_H_ + +#include <stdint.h> +#include <stdlib.h> + +#include <crcutil/interface.h> + +namespace kudu { +namespace crc { + +typedef crcutil_interface::CRC Crc; + +// Returns pointer to singleton instance of CRC32C implementation. +Crc* GetCrc32cInstance(); + +// Helper function to simply calculate a CRC32C of the given data. +uint32_t Crc32c(const void* data, size_t length); + +// Given CRC value of previous chunk of data, +// extends it to new chunk and returns the result. +uint32_t Crc32c(const void* data, size_t length, uint32_t prev_crc32); + +} // namespace crc +} // namespace kudu + +#endif // KUDU_UTIL_CRC_H_ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/curl_util.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/curl_util.cc b/be/src/kudu/util/curl_util.cc new file mode 100644 index 0000000..6211834 --- /dev/null +++ b/be/src/kudu/util/curl_util.cc @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/curl_util.h" + + +#include <curl/curl.h> +#include <glog/logging.h> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/security/openssl_util.h" + +namespace kudu { + +namespace { + +inline Status TranslateError(CURLcode code) { + if (code == CURLE_OK) { + return Status::OK(); + } + return Status::NetworkError("curl error", curl_easy_strerror(code)); +} + +extern "C" { +size_t WriteCallback(void* buffer, size_t size, size_t nmemb, void* user_ptr) { + size_t real_size = size * nmemb; + faststring* buf = reinterpret_cast<faststring*>(user_ptr); + CHECK_NOTNULL(buf)->append(reinterpret_cast<const uint8_t*>(buffer), real_size); + return real_size; +} +} // extern "C" + +} // anonymous namespace + +EasyCurl::EasyCurl() { + // Use our own SSL initialization, and disable curl's. + // Both of these calls are idempotent. + security::InitializeOpenSSL(); + CHECK_EQ(0, curl_global_init(CURL_GLOBAL_DEFAULT & ~CURL_GLOBAL_SSL)); + curl_ = curl_easy_init(); + CHECK(curl_) << "Could not init curl"; +} + +EasyCurl::~EasyCurl() { + curl_easy_cleanup(curl_); +} + +Status EasyCurl::FetchURL(const std::string& url, faststring* buf) { + return DoRequest(url, nullptr, buf); +} + +Status EasyCurl::PostToURL(const std::string& url, + const std::string& post_data, + faststring* dst) { + return DoRequest(url, &post_data, dst); +} + +Status EasyCurl::DoRequest(const std::string& url, + const std::string* post_data, + faststring* dst) { + CHECK_NOTNULL(dst)->clear(); + + RETURN_NOT_OK(TranslateError(curl_easy_setopt( + curl_, CURLOPT_SSL_VERIFYPEER, + static_cast<long>(verify_peer_)))); // NOLINT(runtime/int) + RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_URL, url.c_str()))); + if (return_headers_) { + RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_HEADER, 1))); + } + RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, WriteCallback))); + RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_WRITEDATA, + static_cast<void *>(dst)))); + if (post_data) { + RETURN_NOT_OK(TranslateError(curl_easy_setopt(curl_, CURLOPT_POSTFIELDS, + post_data->c_str()))); + } + + RETURN_NOT_OK(TranslateError(curl_easy_perform(curl_))); + long rc; // NOLINT(runtime/int) curl wants a long + RETURN_NOT_OK(TranslateError(curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &rc))); + if (rc != 200) { + return Status::RemoteError(strings::Substitute("HTTP $0", rc)); + } + + return Status::OK(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/curl_util.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/curl_util.h b/be/src/kudu/util/curl_util.h new file mode 100644 index 0000000..797c8a6 --- /dev/null +++ b/be/src/kudu/util/curl_util.h @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_CURL_UTIL_H +#define KUDU_UTIL_CURL_UTIL_H + +#include <string> + +#include "kudu/gutil/macros.h" +#include "kudu/util/status.h" + +typedef void CURL; + +namespace kudu { + +class faststring; + +// Simple wrapper around curl's "easy" interface, allowing the user to +// fetch web pages into memory using a blocking API. +// +// This is not thread-safe. +class EasyCurl { + public: + EasyCurl(); + ~EasyCurl(); + + // Fetch the given URL into the provided buffer. + // Any existing data in the buffer is replaced. + Status FetchURL(const std::string& url, + faststring* dst); + + // Issue an HTTP POST to the given URL with the given data. + // Returns results in 'dst' as above. + Status PostToURL(const std::string& url, + const std::string& post_data, + faststring* dst); + + // Set whether to verify the server's SSL certificate in the case of an HTTPS + // connection. + void set_verify_peer(bool verify) { + verify_peer_ = verify; + } + + void set_return_headers(bool v) { + return_headers_ = v; + } + + private: + // Do a request. If 'post_data' is non-NULL, does a POST. + // Otherwise, does a GET. + Status DoRequest(const std::string& url, + const std::string* post_data, + faststring* dst); + CURL* curl_; + + // Whether to verify the server certificate. + bool verify_peer_ = true; + + // Whether to return the HTTP headers with the response. + bool return_headers_ = false; + + DISALLOW_COPY_AND_ASSIGN(EasyCurl); +}; + +} // namespace kudu + +#endif /* KUDU_UTIL_CURL_UTIL_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/debug-util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/debug-util-test.cc b/be/src/kudu/util/debug-util-test.cc new file mode 100644 index 0000000..74b5b79 --- /dev/null +++ b/be/src/kudu/util/debug-util-test.cc @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <glog/stl_logging.h> +#include <signal.h> +#include <string> +#include <vector> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/debug-util.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/test_util.h" +#include "kudu/util/thread.h" + +using std::string; +using std::vector; + +namespace kudu { + +class DebugUtilTest : public KuduTest { +}; + +TEST_F(DebugUtilTest, TestStackTrace) { + StackTrace t; + t.Collect(1); + string trace = t.Symbolize(); + ASSERT_STR_CONTAINS(trace, "kudu::DebugUtilTest_TestStackTrace_Test::TestBody"); +} + +// DumpThreadStack is only supported on Linux, since the implementation relies +// on the tgkill syscall which is not portable. +#if defined(__linux__) + +namespace { +void SleeperThread(CountDownLatch* l) { + // We use an infinite loop around WaitFor() instead of a normal Wait() + // so that this test passes in TSAN. Without this, we run into this TSAN + // bug which prevents the sleeping thread from handling signals: + // https://code.google.com/p/thread-sanitizer/issues/detail?id=91 + while (!l->WaitFor(MonoDelta::FromMilliseconds(10))) { + } +} + +void fake_signal_handler(int signum) {} + +bool IsSignalHandlerRegistered(int signum) { + struct sigaction cur_action; + CHECK_EQ(0, sigaction(signum, nullptr, &cur_action)); + return cur_action.sa_handler != SIG_DFL; +} +} // anonymous namespace + +TEST_F(DebugUtilTest, TestStackTraceInvalidTid) { + string s = DumpThreadStack(1); + ASSERT_STR_CONTAINS(s, "unable to deliver signal"); +} + +TEST_F(DebugUtilTest, TestStackTraceSelf) { + string s = DumpThreadStack(Thread::CurrentThreadId()); + ASSERT_STR_CONTAINS(s, "kudu::DebugUtilTest_TestStackTraceSelf_Test::TestBody()"); +} + +TEST_F(DebugUtilTest, TestStackTraceMainThread) { + string s = DumpThreadStack(getpid()); + ASSERT_STR_CONTAINS(s, "kudu::DebugUtilTest_TestStackTraceMainThread_Test::TestBody()"); +} + +TEST_F(DebugUtilTest, TestSignalStackTrace) { + CountDownLatch l(1); + scoped_refptr<Thread> t; + ASSERT_OK(Thread::Create("test", "test thread", &SleeperThread, &l, &t)); + auto cleanup_thr = MakeScopedCleanup([&]() { + // Allow the thread to finish. + l.CountDown(); + t->Join(); + }); + + // We have to loop a little bit because it takes a little while for the thread + // to start up and actually call our function. + ASSERT_EVENTUALLY([&]() { + ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "SleeperThread"); + }); + + // Test that we can change the signal and that the stack traces still work, + // on the new signal. + ASSERT_FALSE(IsSignalHandlerRegistered(SIGHUP)); + ASSERT_OK(SetStackTraceSignal(SIGHUP)); + + // Should now be registered. + ASSERT_TRUE(IsSignalHandlerRegistered(SIGHUP)); + + // SIGUSR2 should be relinquished. + ASSERT_FALSE(IsSignalHandlerRegistered(SIGUSR2)); + + // Stack traces should work using the new handler. + ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "SleeperThread"); + + // Switch back to SIGUSR2 and ensure it changes back. + ASSERT_OK(SetStackTraceSignal(SIGUSR2)); + ASSERT_TRUE(IsSignalHandlerRegistered(SIGUSR2)); + ASSERT_FALSE(IsSignalHandlerRegistered(SIGHUP)); + + // Stack traces should work using the new handler. + ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "SleeperThread"); + + // Register our own signal handler on SIGHUP, and ensure that + // we get a bad Status if we try to use it. + signal(SIGHUP, &fake_signal_handler); + ASSERT_STR_CONTAINS(SetStackTraceSignal(SIGHUP).ToString(), + "unable to install signal handler"); + signal(SIGHUP, SIG_DFL); + + // Stack traces should be disabled + ASSERT_STR_CONTAINS(DumpThreadStack(t->tid()), "unable to take thread stack"); + + // Re-enable so that other tests pass. + ASSERT_OK(SetStackTraceSignal(SIGUSR2)); +} + +// Test which dumps all known threads within this process. +// We don't validate the results in any way -- but this verifies that we can +// dump library threads such as the libc timer_thread and properly time out. +TEST_F(DebugUtilTest, TestDumpAllThreads) { + vector<pid_t> tids; + ASSERT_OK(ListThreads(&tids)); + for (pid_t tid : tids) { + LOG(INFO) << DumpThreadStack(tid); + } +} +#endif + +} // namespace kudu
