http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/object_pool.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/object_pool.h b/be/src/kudu/util/object_pool.h new file mode 100644 index 0000000..147363f --- /dev/null +++ b/be/src/kudu/util/object_pool.h @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Simple pool/freelist for objects of the same type, typically used +// in local context. +#ifndef KUDU_UTIL_OBJECT_POOL_H +#define KUDU_UTIL_OBJECT_POOL_H + +#include <glog/logging.h> +#include <stdint.h> +#include "kudu/gutil/manual_constructor.h" +#include "kudu/gutil/gscoped_ptr.h" + +namespace kudu { + +using base::ManualConstructor; + +template<class T> +class ReturnToPool; + +// An object pool allocates and destroys a single class of objects +// off of a free-list. +// +// Upon destruction of the pool, any objects allocated from this pool are +// destroyed, regardless of whether they have been explicitly returned to the +// pool. +// +// This class is similar to the boost::pool::object_pool, except that the boost +// implementation seems to have O(n) deallocation performance and benchmarked +// really poorly. +// +// This class is not thread-safe. +template<typename T> +class ObjectPool { + public: + typedef ReturnToPool<T> deleter_type; + typedef gscoped_ptr<T, deleter_type> scoped_ptr; + + ObjectPool() : + free_list_head_(NULL), + alloc_list_head_(NULL), + deleter_(this) { + } + + ~ObjectPool() { + // Delete all objects ever allocated from this pool + ListNode *node = alloc_list_head_; + while (node != NULL) { + ListNode *tmp = node; + node = node->next_on_alloc_list; + if (!tmp->is_on_freelist) { + // Have to run the actual destructor if the user forgot to free it. + tmp->Destroy(); + } + delete tmp; + } + } + + // Construct a new object instance from the pool. + T *Construct() { + ManualConstructor<T> *obj = GetObject(); + obj->Init(); + return obj->get(); + } + + template<class Arg1> + T *Construct(Arg1 arg1) { + ManualConstructor<T> *obj = GetObject(); + obj->Init(arg1); + return obj->get(); + } + + // Destroy an object, running its destructor and returning it to the + // free-list. + void Destroy(T *t) { + CHECK_NOTNULL(t); + ListNode *node = static_cast<ListNode *>( + reinterpret_cast<ManualConstructor<T> *>(t)); + + node->Destroy(); + + DCHECK(!node->is_on_freelist); + node->is_on_freelist = true; + node->next_on_free_list = free_list_head_; + free_list_head_ = node; + } + + // Create a scoped_ptr wrapper around the given pointer which came from this + // pool. + // When the scoped_ptr goes out of scope, the object will get released back + // to the pool. + scoped_ptr make_scoped_ptr(T *ptr) { + return scoped_ptr(ptr, deleter_); + } + + private: + class ListNode : ManualConstructor<T> { + friend class ObjectPool<T>; + + ListNode *next_on_free_list; + ListNode *next_on_alloc_list; + + bool is_on_freelist; + }; + + + ManualConstructor<T> *GetObject() { + if (free_list_head_ != NULL) { + ListNode *tmp = free_list_head_; + free_list_head_ = tmp->next_on_free_list; + tmp->next_on_free_list = NULL; + DCHECK(tmp->is_on_freelist); + tmp->is_on_freelist = false; + + return static_cast<ManualConstructor<T> *>(tmp); + } + auto new_node = new ListNode(); + new_node->next_on_free_list = NULL; + new_node->next_on_alloc_list = alloc_list_head_; + new_node->is_on_freelist = false; + alloc_list_head_ = new_node; + return new_node; + } + + // Keeps track of free objects in this pool. + ListNode *free_list_head_; + + // Keeps track of all objects ever allocated by this pool. + ListNode *alloc_list_head_; + + deleter_type deleter_; +}; + +// Functor which returns the passed objects to a specific object pool. +// This can be used in conjunction with scoped_ptr to automatically release +// an object back to a pool when it goes out of scope. +template<class T> +class ReturnToPool { + public: + explicit ReturnToPool(ObjectPool<T> *pool) : + pool_(pool) { + } + + inline void operator()(T *ptr) const { + pool_->Destroy(ptr); + } + + private: + ObjectPool<T> *pool_; +}; + + +} // namespace kudu +#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/oid_generator-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/oid_generator-test.cc b/be/src/kudu/util/oid_generator-test.cc new file mode 100644 index 0000000..a38b496 --- /dev/null +++ b/be/src/kudu/util/oid_generator-test.cc @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/oid_generator.h" + +#include <gtest/gtest.h> +#include <string> + +#include "kudu/util/test_util.h" + +using std::string; + +namespace kudu { + +TEST(ObjectIdGeneratorTest, TestCanoicalizeUuid) { + ObjectIdGenerator gen; + const string kExpectedCanonicalized = "0123456789abcdef0123456789abcdef"; + string canonicalized; + Status s = gen.Canonicalize("not_a_uuid", &canonicalized); + { + SCOPED_TRACE(s.ToString()); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_STR_CONTAINS(s.ToString(), "invalid uuid"); + } + ASSERT_OK(gen.Canonicalize( + "01234567-89ab-cdef-0123-456789abcdef", &canonicalized)); + ASSERT_EQ(kExpectedCanonicalized, canonicalized); + ASSERT_OK(gen.Canonicalize( + "0123456789abcdef0123456789abcdef", &canonicalized)); + ASSERT_EQ(kExpectedCanonicalized, canonicalized); + ASSERT_OK(gen.Canonicalize( + "0123456789AbCdEf0123456789aBcDeF", &canonicalized)); + ASSERT_EQ(kExpectedCanonicalized, canonicalized); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/oid_generator.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/oid_generator.cc b/be/src/kudu/util/oid_generator.cc new file mode 100644 index 0000000..580463c --- /dev/null +++ b/be/src/kudu/util/oid_generator.cc @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/oid_generator.h" + +#include <boost/uuid/uuid_generators.hpp> +#include <exception> +#include <mutex> +#include <string> + +#include "kudu/gutil/stringprintf.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/status.h" + +using strings::Substitute; + +namespace kudu { + +namespace { + +string ConvertUuidToString(const boost::uuids::uuid& to_convert) { + const uint8_t* uuid = to_convert.data; + return StringPrintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", + uuid[0], uuid[1], uuid[2], uuid[3], uuid[4], uuid[5], uuid[6], uuid[7], + uuid[8], uuid[9], uuid[10], uuid[11], uuid[12], uuid[13], uuid[14], uuid[15]); +} + +} // anonymous namespace + +string ObjectIdGenerator::Next() { + std::lock_guard<LockType> l(oid_lock_); + boost::uuids::uuid uuid = oid_generator_(); + return ConvertUuidToString(uuid); +} + +Status ObjectIdGenerator::Canonicalize(const string& input, + string* output) const { + try { + boost::uuids::uuid uuid = oid_validator_(input); + *output = ConvertUuidToString(uuid); + return Status::OK(); + } catch (std::exception& e) { + return Status::InvalidArgument(Substitute("invalid uuid $0: $1", + input, e.what())); + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/oid_generator.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/oid_generator.h b/be/src/kudu/util/oid_generator.h new file mode 100644 index 0000000..7acccc9 --- /dev/null +++ b/be/src/kudu/util/oid_generator.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_UTIL_OID_GENERATOR_H +#define KUDU_UTIL_OID_GENERATOR_H + +#include <boost/uuid/uuid_generators.hpp> +#include <string> + +#include "kudu/gutil/macros.h" +#include "kudu/util/locks.h" +#include "kudu/util/status.h" + +namespace kudu { + +// Generates a unique 32byte id, based on uuid v4. +// This class is thread safe +class ObjectIdGenerator { + public: + ObjectIdGenerator() {} + ~ObjectIdGenerator() {} + + // Generates and returns a new UUID. + std::string Next(); + + // Validates an existing UUID and converts it into the format used by Kudu + // (that is, 16 hexadecimal bytes without any dashes). + Status Canonicalize(const std::string& input, std::string* output) const; + + private: + DISALLOW_COPY_AND_ASSIGN(ObjectIdGenerator); + + typedef simple_spinlock LockType; + + // Protects 'oid_generator_'. + LockType oid_lock_; + + // Generates new UUIDs. + boost::uuids::random_generator oid_generator_; + + // Validates provided UUIDs. + boost::uuids::string_generator oid_validator_; +}; + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/once-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/once-test.cc b/be/src/kudu/util/once-test.cc new file mode 100644 index 0000000..8ccd8b6 --- /dev/null +++ b/be/src/kudu/util/once-test.cc @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <vector> + +#include <gtest/gtest.h> + +#include "kudu/gutil/bind.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/once.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/thread.h" + +using std::vector; +using strings::Substitute; + +namespace kudu { + +namespace { + +struct Thing { + explicit Thing(bool should_fail) + : should_fail_(should_fail), + value_(0) { + } + + Status Init() { + return once_.Init(&Thing::InitOnce, this); + } + + Status InitOnce() { + if (should_fail_) { + return Status::IllegalState("Whoops!"); + } + value_ = 1; + return Status::OK(); + } + + const bool should_fail_; + int value_; + KuduOnceDynamic once_; +}; + +} // anonymous namespace + +TEST(TestOnce, KuduOnceDynamicTest) { + { + Thing t(false); + ASSERT_EQ(0, t.value_); + ASSERT_FALSE(t.once_.initted()); + + for (int i = 0; i < 2; i++) { + ASSERT_OK(t.Init()); + ASSERT_EQ(1, t.value_); + ASSERT_TRUE(t.once_.initted()); + } + } + + { + Thing t(true); + for (int i = 0; i < 2; i++) { + ASSERT_TRUE(t.Init().IsIllegalState()); + ASSERT_EQ(0, t.value_); + ASSERT_TRUE(t.once_.initted()); + } + } +} + +static void InitOrGetInitted(Thing* t, int i) { + if (i % 2 == 0) { + LOG(INFO) << "Thread " << i << " initting"; + t->Init(); + } else { + LOG(INFO) << "Thread " << i << " value: " << t->once_.initted(); + } +} + +TEST(TestOnce, KuduOnceDynamicThreadSafeTest) { + Thing thing(false); + + // The threads will read and write to thing.once_.initted. If access to + // it is not synchronized, TSAN will flag the access as data races. + vector<scoped_refptr<Thread> > threads; + for (int i = 0; i < 10; i++) { + scoped_refptr<Thread> t; + ASSERT_OK(Thread::Create("test", Substitute("thread $0", i), + &InitOrGetInitted, &thing, i, &t)); + threads.push_back(t); + } + + for (const scoped_refptr<Thread>& t : threads) { + t->Join(); + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/once.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/once.cc b/be/src/kudu/util/once.cc new file mode 100644 index 0000000..fada777 --- /dev/null +++ b/be/src/kudu/util/once.cc @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/once.h" + +#include "kudu/util/malloc.h" + +namespace kudu { + +size_t KuduOnceDynamic::memory_footprint_excluding_this() const { + return status_.memory_footprint_excluding_this(); +} + +size_t KuduOnceDynamic::memory_footprint_including_this() const { + return kudu_malloc_usable_size(this) + memory_footprint_excluding_this(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/once.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/once.h b/be/src/kudu/util/once.h new file mode 100644 index 0000000..da26107 --- /dev/null +++ b/be/src/kudu/util/once.h @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_ONCE_H +#define KUDU_UTIL_ONCE_H + +#include <stddef.h> + +#include "kudu/gutil/once.h" +#include "kudu/util/atomic.h" +#include "kudu/util/status.h" + +namespace kudu { + +class KuduOnceDynamic; + +namespace internal { + +// Cheap, single-arg "bound callback" (similar to kudu::Callback) for use +// in KuduOnceDynamic. +template<typename T> +struct MemberFunc { + KuduOnceDynamic* once; + T* instance; + Status (T::*member_func)(); +}; + +template<typename T> +void InitCb(void* arg) { + MemberFunc<T>* mf = reinterpret_cast<MemberFunc<T>*>(arg); + mf->once->status_ = (mf->instance->*mf->member_func)(); + mf->once->set_initted(); +} + +} // namespace internal + +// More versatile version of GoogleOnceDynamic, including the following: +// 1. Can be used with single-arg, non-static member functions. +// 2. Retains results and overall initialization state for repeated access. +// 3. Access to initialization state is safe for concurrent use. +class KuduOnceDynamic { + public: + KuduOnceDynamic() + : initted_(false) { + } + + // If the underlying GoogleOnceDynamic has yet to be invoked, invokes the + // provided member function and stores its return value. Otherwise, + // returns the stored Status. + // + // T: the type of the member passed in. + template<typename T> + Status Init(Status (T::*member_func)(), T* instance) { + internal::MemberFunc<T> mf = { this, instance, member_func }; + + // Clang UBSAN doesn't like it when GoogleOnceDynamic handles the cast + // of the argument: + // + // runtime error: call to function + // kudu::cfile::BloomFileReader::InitOnceCb(kudu::cfile::BloomFileReader*) + // through pointer to incorrect function type 'void (*)(void *)' + // + // So let's do the cast ourselves, to void* here and back in InitCb(). + once_.Init(&internal::InitCb<T>, reinterpret_cast<void*>(&mf)); + return status_; + } + + // kMemOrderAcquire ensures that loads/stores that come after initted() + // aren't reordered to come before it instead. kMemOrderRelease ensures + // the opposite (i.e. loads/stores before set_initted() aren't reordered + // to come after it). + // + // Taken together, threads can safely synchronize on initted_. + bool initted() const { return initted_.Load(kMemOrderAcquire); } + + // Returns the memory usage of this object without the object itself. Should + // be used when embedded inside another object. + size_t memory_footprint_excluding_this() const; + + // Returns the memory usage of this object including the object itself. + // Should be used when allocated on the heap. + size_t memory_footprint_including_this() const; + + private: + template<typename T> + friend void internal::InitCb(void* arg); + + void set_initted() { initted_.Store(true, kMemOrderRelease); } + + AtomicBool initted_; + GoogleOnceDynamic once_; + Status status_; +}; + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/os-util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/os-util-test.cc b/be/src/kudu/util/os-util-test.cc new file mode 100644 index 0000000..b7a59d5 --- /dev/null +++ b/be/src/kudu/util/os-util-test.cc @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/os-util.h" + +#include <gtest/gtest.h> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/errno.h" +#include "kudu/util/test_macros.h" + +using std::string; + +namespace kudu { + +void RunTest(const string& name, int user_ticks, int kernel_ticks, int io_wait) { + string buf = strings::Substitute(string("0 ($0) S 0 0 0 0 0 0 0") + + " 0 0 0 $1 $2 0 0 0 0 0" + + " 0 0 0 0 0 0 0 0 0 0 " + + " 0 0 0 0 0 0 0 0 0 0 " + + " 0 $3 0 0 0 0 0 0 0 0 " + + " 0 0", + name, user_ticks, kernel_ticks, io_wait); + ThreadStats stats; + string extracted_name; + ASSERT_OK(ParseStat(buf, &extracted_name, &stats)); + ASSERT_EQ(name, extracted_name); + ASSERT_EQ(user_ticks * (1e9 / sysconf(_SC_CLK_TCK)), stats.user_ns); + ASSERT_EQ(kernel_ticks * (1e9 / sysconf(_SC_CLK_TCK)), stats.kernel_ns); + ASSERT_EQ(io_wait * (1e9 / sysconf(_SC_CLK_TCK)), stats.iowait_ns); +} + +TEST(OsUtilTest, TestSelf) { + RunTest("test", 111, 222, 333); +} + +TEST(OsUtilTest, TestSelfNameWithSpace) { + RunTest("a space", 111, 222, 333); +} + +TEST(OsUtilTest, TestSelfNameWithParens) { + RunTest("a(b(c((d))e)", 111, 222, 333); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/os-util.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/os-util.cc b/be/src/kudu/util/os-util.cc new file mode 100644 index 0000000..8874bd8 --- /dev/null +++ b/be/src/kudu/util/os-util.cc @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Imported from Impala. Changes include: +// - Namespace and imports. +// - Replaced GetStrErrMsg with ErrnoToString. +// - Replaced StringParser with strings/numbers. +// - Fixes for cpplint. +// - Fixed parsing when thread names have spaces. + +#include "kudu/util/os-util.h" + +#include <fcntl.h> +#include <fstream> +#include <sstream> +#include <string> +#include <sys/resource.h> +#include <vector> +#include <unistd.h> + +#include "kudu/gutil/strings/numbers.h" +#include "kudu/gutil/strings/split.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/errno.h" + +using std::ifstream; +using std::istreambuf_iterator; +using std::ostringstream; +using strings::Split; +using strings::Substitute; + +namespace kudu { + +// Ensure that Impala compiles on earlier kernels. If the target kernel does not support +// _SC_CLK_TCK, sysconf(_SC_CLK_TCK) will return -1. +#ifndef _SC_CLK_TCK +#define _SC_CLK_TCK 2 +#endif + +static const int64_t TICKS_PER_SEC = sysconf(_SC_CLK_TCK); + +// Offsets into the ../stat file array of per-thread statistics. +// +// They are themselves offset by two because the pid and comm fields of the +// file are parsed separately. +static const int64_t USER_TICKS = 13 - 2; +static const int64_t KERNEL_TICKS = 14 - 2; +static const int64_t IO_WAIT = 41 - 2; + +// Largest offset we are interested in, to check we get a well formed stat file. +static const int64_t MAX_OFFSET = IO_WAIT; + +Status ParseStat(const std::string& buffer, std::string* name, ThreadStats* stats) { + DCHECK(stats != nullptr); + + // The thread name should be the only field with parentheses. But the name + // itself may contain parentheses. + size_t open_paren = buffer.find('('); + size_t close_paren = buffer.rfind(')'); + if (open_paren == string::npos || // '(' must exist + close_paren == string::npos || // ')' must exist + open_paren >= close_paren || // '(' must come before ')' + close_paren + 2 == buffer.size()) { // there must be at least two chars after ')' + return Status::IOError("Unrecognised /proc format"); + } + string extracted_name = buffer.substr(open_paren + 1, close_paren - (open_paren + 1)); + string rest = buffer.substr(close_paren + 2); + vector<string> splits = Split(rest, " ", strings::SkipEmpty()); + if (splits.size() < MAX_OFFSET) { + return Status::IOError("Unrecognised /proc format"); + } + + int64 tmp; + if (safe_strto64(splits[USER_TICKS], &tmp)) { + stats->user_ns = tmp * (1e9 / TICKS_PER_SEC); + } + if (safe_strto64(splits[KERNEL_TICKS], &tmp)) { + stats->kernel_ns = tmp * (1e9 / TICKS_PER_SEC); + } + if (safe_strto64(splits[IO_WAIT], &tmp)) { + stats->iowait_ns = tmp * (1e9 / TICKS_PER_SEC); + } + if (name != nullptr) { + *name = extracted_name; + } + return Status::OK(); + +} + +Status GetThreadStats(int64_t tid, ThreadStats* stats) { + DCHECK(stats != nullptr); + if (TICKS_PER_SEC <= 0) { + return Status::NotSupported("ThreadStats not supported"); + } + + ostringstream proc_path; + proc_path << "/proc/self/task/" << tid << "/stat"; + ifstream proc_file(proc_path.str().c_str()); + if (!proc_file.is_open()) { + return Status::IOError("Could not open ifstream"); + } + + string buffer((istreambuf_iterator<char>(proc_file)), + istreambuf_iterator<char>()); + + return ParseStat(buffer, nullptr, stats); // don't want the name +} + +void DisableCoreDumps() { + struct rlimit lim; + PCHECK(getrlimit(RLIMIT_CORE, &lim) == 0); + lim.rlim_cur = 0; + PCHECK(setrlimit(RLIMIT_CORE, &lim) == 0); + + // Set coredump_filter to not dump any parts of the address space. + // Although the above disables core dumps to files, if core_pattern + // is set to a pipe rather than a file, it's not sufficient. Setting + // this pattern results in piping a very minimal dump into the core + // processor (eg abrtd), thus speeding up the crash. + int f = open("/proc/self/coredump_filter", O_WRONLY); + if (f >= 0) { + write(f, "00000000", 8); + close(f); + } +} + + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/os-util.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/os-util.h b/be/src/kudu/util/os-util.h new file mode 100644 index 0000000..a5f80f7 --- /dev/null +++ b/be/src/kudu/util/os-util.h @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Imported from Impala. Changes include: +// - Namespace + imports. +// - Fixes for cpplint. +// - Fixed parsing when thread names have spaces. + +#ifndef KUDU_UTIL_OS_UTIL_H +#define KUDU_UTIL_OS_UTIL_H + +#include <string> + +#include "kudu/util/status.h" + +namespace kudu { + +// Utility methods to read interesting values from /proc. +// TODO: Get stats for parent process. + +// Container struct for statistics read from the /proc filesystem for a thread. +struct ThreadStats { + int64_t user_ns; + int64_t kernel_ns; + int64_t iowait_ns; + + // Default constructor zeroes all members in case structure can't be filled by + // GetThreadStats. + ThreadStats() : user_ns(0), kernel_ns(0), iowait_ns(0) { } +}; + +// Populates ThreadStats object using a given buffer. The buffer is expected to +// conform to /proc/<pid>/task/<tid>/stat layout; an error will be returned otherwise. +// +// If 'name' is supplied, the extracted thread name will be written to it. +Status ParseStat(const std::string&buffer, std::string* name, ThreadStats* stats); + +// Populates ThreadStats object for a given thread by reading from +// /proc/<pid>/task/<tid>/stat. Returns OK unless the file cannot be read or is in an +// unrecognised format, or if the kernel version is not modern enough. +Status GetThreadStats(int64_t tid, ThreadStats* stats); + +// Disable core dumps for this process. +// +// This is useful particularly in tests where we have injected failures and don't +// want to generate a core dump from an "expected" crash. +void DisableCoreDumps(); + +} // namespace kudu + +#endif /* KUDU_UTIL_OS_UTIL_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/path_util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/path_util-test.cc b/be/src/kudu/util/path_util-test.cc new file mode 100644 index 0000000..0d617fc --- /dev/null +++ b/be/src/kudu/util/path_util-test.cc @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <string> +#include <vector> + +#include <gtest/gtest.h> + +#include "kudu/util/path_util.h" + +using std::string; +using std::vector; + +namespace kudu { + +TEST(TestPathUtil, BaseNameTest) { + ASSERT_EQ(".", BaseName("")); + ASSERT_EQ(".", BaseName(".")); + ASSERT_EQ("..", BaseName("..")); + ASSERT_EQ("/", BaseName("/")); + ASSERT_EQ("/", BaseName("//")); + ASSERT_EQ("a", BaseName("a")); + ASSERT_EQ("ab", BaseName("ab")); + ASSERT_EQ("ab", BaseName("ab/")); + ASSERT_EQ("cd", BaseName("ab/cd")); + ASSERT_EQ("ab", BaseName("/ab")); + ASSERT_EQ("ab", BaseName("/ab///")); + ASSERT_EQ("cd", BaseName("/ab/cd")); +} + +TEST(TestPathUtil, DirNameTest) { + ASSERT_EQ(".", DirName("")); + ASSERT_EQ(".", DirName(".")); + ASSERT_EQ(".", DirName("..")); + ASSERT_EQ("/", DirName("/")); +#if defined(__linux__) + // On OS X this test case returns "/", while Linux returns "//". On both + // platforms dirname(1) returns "/". The difference is unlikely to matter in + // practice. + ASSERT_EQ("//", DirName("//")); +#else + ASSERT_EQ("/", DirName("//")); +#endif // defined(__linux__) + ASSERT_EQ(".", DirName("a")); + ASSERT_EQ(".", DirName("ab")); + ASSERT_EQ(".", DirName("ab/")); + ASSERT_EQ("ab", DirName("ab/cd")); + ASSERT_EQ("/", DirName("/ab")); + ASSERT_EQ("/", DirName("/ab///")); + ASSERT_EQ("/ab", DirName("/ab/cd")); +} + +TEST(TestPathUtil, SplitPathTest) { + typedef vector<string> Vec; + ASSERT_EQ(Vec({"/"}), SplitPath("/")); + ASSERT_EQ(Vec({"/", "a", "b"}), SplitPath("/a/b")); + ASSERT_EQ(Vec({"/", "a", "b"}), SplitPath("/a/b/")); + ASSERT_EQ(Vec({"a", "b"}), SplitPath("a/b")); + ASSERT_EQ(Vec({"."}), SplitPath(".")); + ASSERT_EQ(Vec(), SplitPath("")); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/path_util.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/path_util.cc b/be/src/kudu/util/path_util.cc new file mode 100644 index 0000000..6a095cc --- /dev/null +++ b/be/src/kudu/util/path_util.cc @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/path_util.h" + +// Use the POSIX version of dirname(3). +#include <libgen.h> + +#include <glog/logging.h> +#include <string> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/strings/split.h" + +#if defined(__APPLE__) +#include <mutex> +#endif // defined(__APPLE__) + +using std::string; +using std::vector; +using strings::SkipEmpty; +using strings::Split; + +namespace kudu { + +const char kTmpInfix[] = ".kudutmp"; +const char kOldTmpInfix[] = ".tmp"; + +std::string JoinPathSegments(const std::string &a, + const std::string &b) { + CHECK(!a.empty()) << "empty first component: " << a; + CHECK(!b.empty() && b[0] != '/') + << "second path component must be non-empty and relative: " + << b; + if (a[a.size() - 1] == '/') { + return a + b; + } else { + return a + "/" + b; + } +} + +vector<string> SplitPath(const string& path) { + if (path.empty()) return {}; + vector<string> segments; + if (path[0] == '/') segments.push_back("/"); + vector<StringPiece> pieces = Split(path, "/", SkipEmpty()); + for (const StringPiece& piece : pieces) { + segments.emplace_back(piece.data(), piece.size()); + } + return segments; +} + +string DirName(const string& path) { + gscoped_ptr<char[], FreeDeleter> path_copy(strdup(path.c_str())); +#if defined(__APPLE__) + static std::mutex lock; + std::lock_guard<std::mutex> l(lock); +#endif // defined(__APPLE__) + return ::dirname(path_copy.get()); +} + +string BaseName(const string& path) { + gscoped_ptr<char[], FreeDeleter> path_copy(strdup(path.c_str())); + return basename(path_copy.get()); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/path_util.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/path_util.h b/be/src/kudu/util/path_util.h new file mode 100644 index 0000000..bb5c631 --- /dev/null +++ b/be/src/kudu/util/path_util.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Utility methods for dealing with file paths. +#ifndef KUDU_UTIL_PATH_UTIL_H +#define KUDU_UTIL_PATH_UTIL_H + +#include <string> +#include <vector> + +namespace kudu { + +// Common tmp infix +extern const char kTmpInfix[]; +// Infix from versions of Kudu prior to 1.2. +extern const char kOldTmpInfix[]; + +// Join two path segments with the appropriate path separator, +// if necessary. +std::string JoinPathSegments(const std::string &a, + const std::string &b); + +// Split a path into segments with the appropriate path separator. +std::vector<std::string> SplitPath(const std::string& path); + +// Return the enclosing directory of path. +// This is like dirname(3) but for C++ strings. +std::string DirName(const std::string& path); + +// Return the terminal component of a path. +// This is like basename(3) but for C++ strings. +std::string BaseName(const std::string& path); + +} // namespace kudu +#endif /* KUDU_UTIL_PATH_UTIL_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pb_util-internal.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/pb_util-internal.cc b/be/src/kudu/util/pb_util-internal.cc new file mode 100644 index 0000000..d891d75 --- /dev/null +++ b/be/src/kudu/util/pb_util-internal.cc @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "kudu/util/pb_util-internal.h" + +namespace kudu { +namespace pb_util { +namespace internal { + +//////////////////////////////////////////// +// SequentialFileFileInputStream +//////////////////////////////////////////// + +bool SequentialFileFileInputStream::Next(const void **data, int *size) { + if (PREDICT_FALSE(!status_.ok())) { + LOG(WARNING) << "Already failed on a previous read: " << status_.ToString(); + return false; + } + + size_t available = (buffer_used_ - buffer_offset_); + if (available > 0) { + *data = buffer_.get() + buffer_offset_; + *size = available; + buffer_offset_ += available; + total_read_ += available; + return true; + } + + Slice result(buffer_.get(), buffer_size_); + status_ = rfile_->Read(&result); + if (!status_.ok()) { + LOG(WARNING) << "Read at " << buffer_offset_ << " failed: " << status_.ToString(); + return false; + } + + buffer_used_ = result.size(); + buffer_offset_ = buffer_used_; + total_read_ += buffer_used_; + *data = buffer_.get(); + *size = buffer_used_; + return buffer_used_ > 0; +} + +bool SequentialFileFileInputStream::Skip(int count) { + CHECK_GT(count, 0); + int avail = (buffer_used_ - buffer_offset_); + if (avail > count) { + buffer_offset_ += count; + total_read_ += count; + } else { + buffer_used_ = 0; + buffer_offset_ = 0; + status_ = rfile_->Skip(count - avail); + total_read_ += count - avail; + } + return status_.ok(); +} + +//////////////////////////////////////////// +// WritableFileOutputStream +//////////////////////////////////////////// + +bool WritableFileOutputStream::Next(void **data, int *size) { + if (PREDICT_FALSE(!status_.ok())) { + LOG(WARNING) << "Already failed on a previous write: " << status_.ToString(); + return false; + } + + size_t available = (buffer_size_ - buffer_offset_); + if (available > 0) { + *data = buffer_.get() + buffer_offset_; + *size = available; + buffer_offset_ += available; + return true; + } + + if (!Flush()) { + return false; + } + + buffer_offset_ = buffer_size_; + *data = buffer_.get(); + *size = buffer_size_; + return true; +} + +} // namespace internal +} // namespace pb_util +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pb_util-internal.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/pb_util-internal.h b/be/src/kudu/util/pb_util-internal.h new file mode 100644 index 0000000..770d730 --- /dev/null +++ b/be/src/kudu/util/pb_util-internal.h @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Classes used internally by pb_util.h. +// This header should not be included by anything but pb_util and its tests. +#ifndef KUDU_UTIL_PB_UTIL_INTERNAL_H +#define KUDU_UTIL_PB_UTIL_INTERNAL_H + +#include <memory> + +#include <glog/logging.h> +#include <google/protobuf/io/zero_copy_stream.h> + +#include "kudu/util/env.h" + +namespace kudu { +namespace pb_util { +namespace internal { + +// Input Stream used by ParseFromSequentialFile() +class SequentialFileFileInputStream : public google::protobuf::io::ZeroCopyInputStream { + public: + explicit SequentialFileFileInputStream(SequentialFile *rfile, + size_t buffer_size = kDefaultBufferSize) + : buffer_used_(0), buffer_offset_(0), + buffer_size_(buffer_size), buffer_(new uint8[buffer_size_]), + total_read_(0), rfile_(rfile) { + CHECK_GT(buffer_size, 0); + } + + ~SequentialFileFileInputStream() { + } + + bool Next(const void **data, int *size) OVERRIDE; + bool Skip(int count) OVERRIDE; + + void BackUp(int count) OVERRIDE { + CHECK_GE(count, 0); + CHECK_LE(count, buffer_offset_); + buffer_offset_ -= count; + total_read_ -= count; + } + + int64 ByteCount() const OVERRIDE { + return total_read_; + } + + Status status() const { + return status_; + } + + private: + static const size_t kDefaultBufferSize = 8192; + + Status status_; + + size_t buffer_used_; + size_t buffer_offset_; + const size_t buffer_size_; + std::unique_ptr<uint8_t[]> buffer_; + + size_t total_read_; + SequentialFile *rfile_; +}; + +// Output Stream used by SerializeToWritableFile() +class WritableFileOutputStream : public google::protobuf::io::ZeroCopyOutputStream { + public: + explicit WritableFileOutputStream(WritableFile *wfile, size_t buffer_size = kDefaultBufferSize) + : buffer_offset_(0), buffer_size_(buffer_size), buffer_(new uint8[buffer_size_]), + flushed_(0), wfile_(wfile) { + CHECK_GT(buffer_size, 0); + } + + ~WritableFileOutputStream() { + } + + bool Flush() { + if (buffer_offset_ > 0) { + Slice data(buffer_.get(), buffer_offset_); + status_ = wfile_->Append(data); + flushed_ += buffer_offset_; + buffer_offset_ = 0; + } + return status_.ok(); + } + + bool Next(void **data, int *size) OVERRIDE; + + void BackUp(int count) OVERRIDE { + CHECK_GE(count, 0); + CHECK_LE(count, buffer_offset_); + buffer_offset_ -= count; + } + + int64 ByteCount() const OVERRIDE { + return flushed_ + buffer_offset_; + } + + private: + static const size_t kDefaultBufferSize = 8192; + + Status status_; + + size_t buffer_offset_; + const size_t buffer_size_; + std::unique_ptr<uint8_t[]> buffer_; + + size_t flushed_; + WritableFile *wfile_; +}; + +} // namespace internal +} // namespace pb_util +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pb_util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/pb_util-test.cc b/be/src/kudu/util/pb_util-test.cc new file mode 100644 index 0000000..304e910 --- /dev/null +++ b/be/src/kudu/util/pb_util-test.cc @@ -0,0 +1,612 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <sys/types.h> +#include <unistd.h> + +#include <memory> +#include <string> +#include <vector> + +#include <gflags/gflags_declare.h> +#include <google/protobuf/descriptor.pb.h> +#include <gtest/gtest.h> + +#include "kudu/util/env_util.h" +#include "kudu/util/pb_util-internal.h" +#include "kudu/util/pb_util.h" +#include "kudu/util/pb_util_test.pb.h" +#include "kudu/util/proto_container_test.pb.h" +#include "kudu/util/proto_container_test2.pb.h" +#include "kudu/util/proto_container_test3.pb.h" +#include "kudu/util/status.h" +#include "kudu/util/test_util.h" + +namespace kudu { +namespace pb_util { + +using google::protobuf::FileDescriptorSet; +using internal::WritableFileOutputStream; +using std::ostringstream; +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using std::vector; + +static const char* kTestFileName = "pb_container.meta"; +static const char* kTestKeyvalName = "my-key"; +static const int kTestKeyvalValue = 1; +static const int kUseDefaultVersion = 0; // Use the default container version (don't set it). + +class TestPBUtil : public KuduTest { + public: + virtual void SetUp() OVERRIDE { + KuduTest::SetUp(); + path_ = GetTestPath(kTestFileName); + } + + protected: + // Create a container file with expected values. + // Since this is a unit test class, and we want it to be fast, we do not + // fsync by default. + Status CreateKnownGoodContainerFile(CreateMode create = OVERWRITE, + SyncMode sync = NO_SYNC); + + // Create a new Protobuf Container File Writer. + // Set version to kUseDefaultVersion to use the default version. + Status NewPBCWriter(int version, RWFileOptions opts, + unique_ptr<WritablePBContainerFile>* pb_writer); + + // Same as CreateKnownGoodContainerFile(), but with settable file version. + // Set version to kUseDefaultVersion to use the default version. + Status CreateKnownGoodContainerFileWithVersion(int version, + CreateMode create = OVERWRITE, + SyncMode sync = NO_SYNC); + + // XORs the data in the specified range of the file at the given path. + Status BitFlipFileByteRange(const string& path, uint64_t offset, uint64_t length); + + void DumpPBCToString(const string& path, bool oneline_output, string* ret); + + // Truncate the specified file to the specified length. + Status TruncateFile(const string& path, uint64_t size); + + // Output file name for most unit tests. + string path_; +}; + +// Parameterized test class for running tests across various versions of PB +// container files. +class TestPBContainerVersions : public TestPBUtil, + public ::testing::WithParamInterface<int> { + public: + TestPBContainerVersions() + : version_(GetParam()) { + } + + protected: + const int version_; // The parameterized container version we are testing. +}; + +INSTANTIATE_TEST_CASE_P(SupportedVersions, TestPBContainerVersions, + ::testing::Values(1, 2, kUseDefaultVersion)); + +Status TestPBUtil::CreateKnownGoodContainerFile(CreateMode create, SyncMode sync) { + ProtoContainerTestPB test_pb; + test_pb.set_name(kTestKeyvalName); + test_pb.set_value(kTestKeyvalValue); + return WritePBContainerToPath(env_, path_, test_pb, create, sync); +} + +Status TestPBUtil::NewPBCWriter(int version, RWFileOptions opts, + unique_ptr<WritablePBContainerFile>* pb_writer) { + unique_ptr<RWFile> writer; + RETURN_NOT_OK(env_->NewRWFile(opts, path_, &writer)); + pb_writer->reset(new WritablePBContainerFile(std::move(writer))); + if (version != kUseDefaultVersion) { + (*pb_writer)->SetVersionForTests(version); + } + return Status::OK(); +} + +Status TestPBUtil::CreateKnownGoodContainerFileWithVersion(int version, + CreateMode create, + SyncMode sync) { + ProtoContainerTestPB test_pb; + test_pb.set_name(kTestKeyvalName); + test_pb.set_value(kTestKeyvalValue); + + unique_ptr<WritablePBContainerFile> pb_writer; + RETURN_NOT_OK(NewPBCWriter(version, RWFileOptions(), &pb_writer)); + RETURN_NOT_OK(pb_writer->CreateNew(test_pb)); + RETURN_NOT_OK(pb_writer->Append(test_pb)); + RETURN_NOT_OK(pb_writer->Close()); + return Status::OK(); +} + +Status TestPBUtil::BitFlipFileByteRange(const string& path, uint64_t offset, uint64_t length) { + faststring buf; + // Read the data from disk. + { + unique_ptr<RandomAccessFile> file; + RETURN_NOT_OK(env_->NewRandomAccessFile(path, &file)); + uint64_t size; + RETURN_NOT_OK(file->Size(&size)); + faststring scratch; + scratch.resize(size); + Slice slice(scratch.data(), size); + RETURN_NOT_OK(file->Read(0, &slice)); + buf.append(slice.data(), slice.size()); + } + + // Flip the bits. + for (uint64_t i = 0; i < length; i++) { + uint8_t* addr = buf.data() + offset + i; + *addr = ~*addr; + } + + // Write the data back to disk. + unique_ptr<WritableFile> file; + RETURN_NOT_OK(env_->NewWritableFile(path, &file)); + RETURN_NOT_OK(file->Append(buf)); + RETURN_NOT_OK(file->Close()); + + return Status::OK(); +} + +Status TestPBUtil::TruncateFile(const string& path, uint64_t size) { + unique_ptr<RWFile> file; + RWFileOptions opts; + opts.mode = Env::OPEN_EXISTING; + RETURN_NOT_OK(env_->NewRWFile(opts, path, &file)); + RETURN_NOT_OK(file->Truncate(size)); + return Status::OK(); +} + +TEST_F(TestPBUtil, TestWritableFileOutputStream) { + shared_ptr<WritableFile> file; + string path = GetTestPath("test.out"); + ASSERT_OK(env_util::OpenFileForWrite(env_, path, &file)); + + WritableFileOutputStream stream(file.get(), 4096); + + void* buf; + int size; + + // First call should yield the whole buffer. + ASSERT_TRUE(stream.Next(&buf, &size)); + ASSERT_EQ(4096, size); + ASSERT_EQ(4096, stream.ByteCount()); + + // Backup 1000 and the next call should yield 1000 + stream.BackUp(1000); + ASSERT_EQ(3096, stream.ByteCount()); + + ASSERT_TRUE(stream.Next(&buf, &size)); + ASSERT_EQ(1000, size); + + // Another call should flush and yield a new buffer of 4096 + ASSERT_TRUE(stream.Next(&buf, &size)); + ASSERT_EQ(4096, size); + ASSERT_EQ(8192, stream.ByteCount()); + + // Should be able to backup to 7192 + stream.BackUp(1000); + ASSERT_EQ(7192, stream.ByteCount()); + + // Flushing shouldn't change written count. + ASSERT_TRUE(stream.Flush()); + ASSERT_EQ(7192, stream.ByteCount()); + + // Since we just flushed, we should get another full buffer. + ASSERT_TRUE(stream.Next(&buf, &size)); + ASSERT_EQ(4096, size); + ASSERT_EQ(7192 + 4096, stream.ByteCount()); + + ASSERT_TRUE(stream.Flush()); + + ASSERT_EQ(stream.ByteCount(), file->Size()); +} + +// Basic read/write test. +TEST_F(TestPBUtil, TestPBContainerSimple) { + // Exercise both the SYNC and NO_SYNC codepaths, despite the fact that we + // aren't able to observe a difference in the test. + vector<SyncMode> modes = { SYNC, NO_SYNC }; + for (SyncMode mode : modes) { + + // Write the file. + ASSERT_OK(CreateKnownGoodContainerFile(NO_OVERWRITE, mode)); + + // Read it back, should validate and contain the expected values. + ProtoContainerTestPB test_pb; + ASSERT_OK(ReadPBContainerFromPath(env_, path_, &test_pb)); + ASSERT_EQ(kTestKeyvalName, test_pb.name()); + ASSERT_EQ(kTestKeyvalValue, test_pb.value()); + + // Delete the file. + ASSERT_OK(env_->DeleteFile(path_)); + } +} + +// Corruption / various failure mode test. +TEST_P(TestPBContainerVersions, TestCorruption) { + // Test that we indicate when the file does not exist. + ProtoContainerTestPB test_pb; + Status s = ReadPBContainerFromPath(env_, path_, &test_pb); + ASSERT_TRUE(s.IsNotFound()) << "Should not be found: " << path_ << ": " << s.ToString(); + + // Test that an empty file looks like corruption. + { + // Create the empty file. + unique_ptr<WritableFile> file; + ASSERT_OK(env_->NewWritableFile(path_, &file)); + ASSERT_OK(file->Close()); + } + s = ReadPBContainerFromPath(env_, path_, &test_pb); + ASSERT_TRUE(s.IsIncomplete()) << "Should be zero length: " << path_ << ": " << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid"); + + // Test truncated file. + ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_)); + uint64_t known_good_size = 0; + ASSERT_OK(env_->GetFileSize(path_, &known_good_size)); + ASSERT_OK(TruncateFile(path_, known_good_size - 2)); + s = ReadPBContainerFromPath(env_, path_, &test_pb); + if (version_ == 1) { + ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect size: " << path_ << ": " << s.ToString(); + } else { + ASSERT_TRUE(s.IsIncomplete()) << "Should be incorrect size: " << path_ << ": " << s.ToString(); + } + ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid"); + + // Test corrupted magic. + ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_)); + ASSERT_OK(BitFlipFileByteRange(path_, 0, 2)); + s = ReadPBContainerFromPath(env_, path_, &test_pb); + ASSERT_TRUE(s.IsCorruption()) << "Should have invalid magic: " << path_ << ": " << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "Invalid magic number"); + + // Test corrupted version. + ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_)); + ASSERT_OK(BitFlipFileByteRange(path_, 8, 2)); + s = ReadPBContainerFromPath(env_, path_, &test_pb); + ASSERT_TRUE(s.IsNotSupported()) << "Should have unsupported version number: " << path_ << ": " + << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), " Protobuf container has unsupported version"); + + // Test corrupted magic+version checksum (only exists in the V2+ format). + if (version_ >= 2) { + ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_)); + ASSERT_OK(BitFlipFileByteRange(path_, 12, 2)); + s = ReadPBContainerFromPath(env_, path_, &test_pb); + ASSERT_TRUE(s.IsCorruption()) << "Should have corrupted file header checksum: " << path_ << ": " + << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "File header checksum does not match"); + } + + // Test record corruption below. + const int kFirstRecordOffset = (version_ == 1) ? 12 : 16; + + // Test corrupted data length. + ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_)); + ASSERT_OK(BitFlipFileByteRange(path_, kFirstRecordOffset, 2)); + s = ReadPBContainerFromPath(env_, path_, &test_pb); + if (version_ == 1) { + ASSERT_TRUE(s.IsCorruption()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid"); + } else { + ASSERT_TRUE(s.IsCorruption()) << "Should be invalid data length checksum: " + << path_ << ": " << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum"); + } + + // Test corrupted data (looks like bad checksum). + ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_)); + ASSERT_OK(BitFlipFileByteRange(path_, kFirstRecordOffset + 4, 2)); + s = ReadPBContainerFromPath(env_, path_, &test_pb); + ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect checksum: " << path_ << ": " + << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum"); + + // Test corrupted checksum. + ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_)); + ASSERT_OK(BitFlipFileByteRange(path_, known_good_size - 4, 2)); + s = ReadPBContainerFromPath(env_, path_, &test_pb); + ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect checksum: " << path_ << ": " + << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum"); +} + +// Test partial record at end of file. +TEST_P(TestPBContainerVersions, TestPartialRecord) { + ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_)); + uint64_t known_good_size; + ASSERT_OK(env_->GetFileSize(path_, &known_good_size)); + ASSERT_OK(TruncateFile(path_, known_good_size - 2)); + + unique_ptr<RandomAccessFile> file; + ASSERT_OK(env_->NewRandomAccessFile(path_, &file)); + ReadablePBContainerFile pb_file(std::move(file)); + ASSERT_OK(pb_file.Open()); + ProtoContainerTestPB test_pb; + Status s = pb_file.ReadNextPB(&test_pb); + // Loop to verify that the same response is repeatably returned. + for (int i = 0; i < 2; i++) { + if (version_ == 1) { + ASSERT_TRUE(s.IsCorruption()) << s.ToString(); + } else { + ASSERT_TRUE(s.IsIncomplete()) << s.ToString(); + } + ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid"); + } + ASSERT_OK(pb_file.Close()); +} + +// Test that it is possible to append after a partial write if we truncate the +// partial record. This is only fully supported in V2+. +TEST_P(TestPBContainerVersions, TestAppendAfterPartialWrite) { + uint64_t known_good_size; + ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_)); + ASSERT_OK(env_->GetFileSize(path_, &known_good_size)); + + unique_ptr<WritablePBContainerFile> writer; + RWFileOptions opts; + opts.mode = Env::OPEN_EXISTING; + ASSERT_OK(NewPBCWriter(version_, opts, &writer)); + ASSERT_OK(writer->OpenExisting()); + + ASSERT_OK(TruncateFile(path_, known_good_size - 2)); + + unique_ptr<RandomAccessFile> file; + ASSERT_OK(env_->NewRandomAccessFile(path_, &file)); + ReadablePBContainerFile reader(std::move(file)); + ASSERT_OK(reader.Open()); + ProtoContainerTestPB test_pb; + Status s = reader.ReadNextPB(&test_pb); + ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid"); + if (version_ == 1) { + ASSERT_TRUE(s.IsCorruption()) << s.ToString(); + return; // The rest of the test does not apply to version 1. + } + ASSERT_TRUE(s.IsIncomplete()) << s.ToString(); + + // Now truncate cleanly. + ASSERT_OK(TruncateFile(path_, reader.offset())); + s = reader.ReadNextPB(&test_pb); + ASSERT_TRUE(s.IsEndOfFile()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "Reached end of file"); + + // Reopen the writer to allow appending more records. + // Append a record and read it back. + ASSERT_OK(NewPBCWriter(version_, opts, &writer)); + ASSERT_OK(writer->OpenExisting()); + test_pb.set_name("hello"); + test_pb.set_value(1); + ASSERT_OK(writer->Append(test_pb)); + test_pb.Clear(); + ASSERT_OK(reader.ReadNextPB(&test_pb)); + ASSERT_EQ("hello", test_pb.name()); + ASSERT_EQ(1, test_pb.value()); +} + +// Simple test for all versions. +TEST_P(TestPBContainerVersions, TestSingleMessage) { + ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_)); + ProtoContainerTestPB test_pb; + ASSERT_OK(ReadPBContainerFromPath(env_, path_, &test_pb)); + ASSERT_EQ(kTestKeyvalName, test_pb.name()); + ASSERT_EQ(kTestKeyvalValue, test_pb.value()); +} + +TEST_P(TestPBContainerVersions, TestMultipleMessages) { + ProtoContainerTestPB pb; + pb.set_name("foo"); + pb.set_note("bar"); + + unique_ptr<WritablePBContainerFile> pb_writer; + ASSERT_OK(NewPBCWriter(version_, RWFileOptions(), &pb_writer)); + ASSERT_OK(pb_writer->CreateNew(pb)); + + for (int i = 0; i < 10; i++) { + pb.set_value(i); + ASSERT_OK(pb_writer->Append(pb)); + } + ASSERT_OK(pb_writer->Close()); + + int pbs_read = 0; + unique_ptr<RandomAccessFile> reader; + ASSERT_OK(env_->NewRandomAccessFile(path_, &reader)); + ReadablePBContainerFile pb_reader(std::move(reader)); + ASSERT_OK(pb_reader.Open()); + for (int i = 0;; i++) { + ProtoContainerTestPB read_pb; + Status s = pb_reader.ReadNextPB(&read_pb); + if (s.IsEndOfFile()) { + break; + } + ASSERT_OK(s); + ASSERT_EQ(pb.name(), read_pb.name()); + ASSERT_EQ(read_pb.value(), i); + ASSERT_EQ(pb.note(), read_pb.note()); + pbs_read++; + } + ASSERT_EQ(10, pbs_read); + ASSERT_OK(pb_reader.Close()); +} + +TEST_P(TestPBContainerVersions, TestInterleavedReadWrite) { + ProtoContainerTestPB pb; + pb.set_name("foo"); + pb.set_note("bar"); + + // Open the file for writing and reading. + unique_ptr<WritablePBContainerFile> pb_writer; + ASSERT_OK(NewPBCWriter(version_, RWFileOptions(), &pb_writer)); + unique_ptr<RandomAccessFile> reader; + ASSERT_OK(env_->NewRandomAccessFile(path_, &reader)); + ReadablePBContainerFile pb_reader(std::move(reader)); + + // Write the header (writer) and validate it (reader). + ASSERT_OK(pb_writer->CreateNew(pb)); + ASSERT_OK(pb_reader.Open()); + + for (int i = 0; i < 10; i++) { + // Write a message and read it back. + pb.set_value(i); + ASSERT_OK(pb_writer->Append(pb)); + ProtoContainerTestPB read_pb; + ASSERT_OK(pb_reader.ReadNextPB(&read_pb)); + ASSERT_EQ(pb.name(), read_pb.name()); + ASSERT_EQ(read_pb.value(), i); + ASSERT_EQ(pb.note(), read_pb.note()); + } + + // After closing the writer, the reader should be out of data. + ASSERT_OK(pb_writer->Close()); + ASSERT_TRUE(pb_reader.ReadNextPB(nullptr).IsEndOfFile()); + ASSERT_OK(pb_reader.Close()); +} + +TEST_F(TestPBUtil, TestPopulateDescriptorSet) { + { + // No dependencies --> just one proto. + ProtoContainerTestPB pb; + FileDescriptorSet protos; + WritablePBContainerFile::PopulateDescriptorSet( + pb.GetDescriptor()->file(), &protos); + ASSERT_EQ(1, protos.file_size()); + } + { + // One direct dependency --> two protos. + ProtoContainerTest2PB pb; + FileDescriptorSet protos; + WritablePBContainerFile::PopulateDescriptorSet( + pb.GetDescriptor()->file(), &protos); + ASSERT_EQ(2, protos.file_size()); + } + { + // One direct and one indirect dependency --> three protos. + ProtoContainerTest3PB pb; + FileDescriptorSet protos; + WritablePBContainerFile::PopulateDescriptorSet( + pb.GetDescriptor()->file(), &protos); + ASSERT_EQ(3, protos.file_size()); + } +} + +void TestPBUtil::DumpPBCToString(const string& path, bool oneline_output, + string* ret) { + unique_ptr<RandomAccessFile> reader; + ASSERT_OK(env_->NewRandomAccessFile(path, &reader)); + ReadablePBContainerFile pb_reader(std::move(reader)); + ASSERT_OK(pb_reader.Open()); + ostringstream oss; + ASSERT_OK(pb_reader.Dump(&oss, oneline_output)); + ASSERT_OK(pb_reader.Close()); + *ret = oss.str(); +} + +TEST_P(TestPBContainerVersions, TestDumpPBContainer) { + const char* kExpectedOutput = + "Message 0\n" + "-------\n" + "record_one {\n" + " name: \"foo\"\n" + " value: 0\n" + "}\n" + "record_two {\n" + " record {\n" + " name: \"foo\"\n" + " value: 0\n" + " }\n" + "}\n" + "\n" + "Message 1\n" + "-------\n" + "record_one {\n" + " name: \"foo\"\n" + " value: 1\n" + "}\n" + "record_two {\n" + " record {\n" + " name: \"foo\"\n" + " value: 2\n" + " }\n" + "}\n\n"; + + const char* kExpectedOutputShort = + "0\trecord_one { name: \"foo\" value: 0 } record_two { record { name: \"foo\" value: 0 } }\n" + "1\trecord_one { name: \"foo\" value: 1 } record_two { record { name: \"foo\" value: 2 } }\n"; + + ProtoContainerTest3PB pb; + pb.mutable_record_one()->set_name("foo"); + pb.mutable_record_two()->mutable_record()->set_name("foo"); + + unique_ptr<WritablePBContainerFile> pb_writer; + ASSERT_OK(NewPBCWriter(version_, RWFileOptions(), &pb_writer)); + ASSERT_OK(pb_writer->CreateNew(pb)); + + for (int i = 0; i < 2; i++) { + pb.mutable_record_one()->set_value(i); + pb.mutable_record_two()->mutable_record()->set_value(i*2); + ASSERT_OK(pb_writer->Append(pb)); + } + ASSERT_OK(pb_writer->Close()); + + string output; + NO_FATALS(DumpPBCToString(path_, false, &output)); + ASSERT_STREQ(kExpectedOutput, output.c_str()); + + NO_FATALS(DumpPBCToString(path_, true, &output)); + ASSERT_STREQ(kExpectedOutputShort, output.c_str()); +} + +TEST_F(TestPBUtil, TestOverwriteExistingPB) { + ASSERT_OK(CreateKnownGoodContainerFile(NO_OVERWRITE)); + ASSERT_TRUE(CreateKnownGoodContainerFile(NO_OVERWRITE).IsAlreadyPresent()); + ASSERT_OK(CreateKnownGoodContainerFile(OVERWRITE)); + ASSERT_OK(CreateKnownGoodContainerFile(OVERWRITE)); +} + +TEST_F(TestPBUtil, TestRedaction) { + ASSERT_NE("", gflags::SetCommandLineOption("redact", "log")); + TestSecurePrintingPB pb; + + pb.set_insecure1("public 1"); + pb.set_insecure2("public 2"); + pb.set_secure1("private 1"); + pb.set_secure2("private 2"); + pb.add_repeated_secure("private 3"); + pb.add_repeated_secure("private 4"); + pb.set_insecure3("public 3"); + + for (auto s : {SecureDebugString(pb), SecureShortDebugString(pb)}) { + ASSERT_EQ(string::npos, s.find("private")); + ASSERT_STR_CONTAINS(s, "<redacted>"); + ASSERT_STR_CONTAINS(s, "public 1"); + ASSERT_STR_CONTAINS(s, "public 2"); + ASSERT_STR_CONTAINS(s, "public 3"); + } + + // If we disable redaction, we should see the private fields. + ASSERT_NE("", gflags::SetCommandLineOption("redact", "")); + ASSERT_STR_CONTAINS(SecureDebugString(pb), "private"); +} + +} // namespace pb_util +} // namespace kudu
