http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/protoc-gen-insertions.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/protoc-gen-insertions.cc b/be/src/kudu/util/protoc-gen-insertions.cc new file mode 100644 index 0000000..d8769aa --- /dev/null +++ b/be/src/kudu/util/protoc-gen-insertions.cc @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Simple protoc plugin which inserts some code at the top of each generated protobuf. +// Currently, this just adds an include of protobuf-annotations.h, a file which hooks up +// the protobuf concurrency annotations to our TSAN annotations. +#include <glog/logging.h> +#include <google/protobuf/compiler/code_generator.h> +#include <google/protobuf/compiler/plugin.h> +#include <google/protobuf/descriptor.h> +#include <google/protobuf/io/printer.h> +#include <google/protobuf/io/zero_copy_stream.h> +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/strings/strip.h" +#include "kudu/gutil/strings/substitute.h" + +using google::protobuf::io::ZeroCopyOutputStream; +using google::protobuf::io::Printer; + +namespace kudu { + +static const char* const kIncludeToInsert = "#include \"kudu/util/protobuf-annotations.h\"\n"; +static const char* const kProtoExtension = ".proto"; + +class InsertAnnotations : public ::google::protobuf::compiler::CodeGenerator { + virtual bool Generate(const google::protobuf::FileDescriptor *file, + const std::string &/*param*/, + google::protobuf::compiler::GeneratorContext *gen_context, + std::string *error) const OVERRIDE { + + // Determine the file name we will substitute into. + string path_no_extension; + if (!TryStripSuffixString(file->name(), kProtoExtension, &path_no_extension)) { + *error = strings::Substitute("file name $0 did not end in $1", file->name(), kProtoExtension); + return false; + } + string pb_file = path_no_extension + ".pb.cc"; + + // Actually insert the new #include + gscoped_ptr<ZeroCopyOutputStream> inserter(gen_context->OpenForInsert(pb_file, "includes")); + Printer printer(inserter.get(), '$'); + printer.Print(kIncludeToInsert); + + if (printer.failed()) { + *error = "Failed to print to output file"; + return false; + } + + return true; + } +}; + +} // namespace kudu + +int main(int argc, char *argv[]) { + kudu::InsertAnnotations generator; + return google::protobuf::compiler::PluginMain(argc, argv, &generator); +}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pstack_watcher-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/pstack_watcher-test.cc b/be/src/kudu/util/pstack_watcher-test.cc new file mode 100644 index 0000000..652fec2 --- /dev/null +++ b/be/src/kudu/util/pstack_watcher-test.cc @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/pstack_watcher.h" + +#include <gtest/gtest.h> +#include <memory> +#include <poll.h> +#include <stdio.h> +#include <unistd.h> +#include <vector> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/bitmap.h" +#include "kudu/util/env.h" +#include "kudu/util/errno.h" +#include "kudu/util/test_macros.h" + +using std::shared_ptr; +using std::string; +using strings::Substitute; + +namespace kudu { + +TEST(TestPstackWatcher, TestPstackWatcherCancellation) { + PstackWatcher watcher(MonoDelta::FromSeconds(1000000)); + watcher.Shutdown(); +} + +TEST(TestPstackWatcher, TestWait) { + PstackWatcher watcher(MonoDelta::FromMilliseconds(10)); + watcher.Wait(); +} + +TEST(TestPstackWatcher, TestDumpStacks) { + ASSERT_OK(PstackWatcher::DumpStacks()); +} + +static shared_ptr<FILE> RedirectStdout(string *temp_path) { + string temp_dir; + CHECK_OK(Env::Default()->GetTestDirectory(&temp_dir)); + *temp_path = Substitute("$0/pstack_watcher-dump.$1.txt", + temp_dir, getpid()); + return shared_ptr<FILE>( + freopen(temp_path->c_str(), "w", stdout), fclose); +} + +TEST(TestPstackWatcher, TestPstackWatcherRunning) { + string stdout_file; + int old_stdout; + CHECK_ERR(old_stdout = dup(STDOUT_FILENO)); + { + shared_ptr<FILE> out_fp = RedirectStdout(&stdout_file); + PCHECK(out_fp.get()); + PstackWatcher watcher(MonoDelta::FromMilliseconds(500)); + while (watcher.IsRunning()) { + SleepFor(MonoDelta::FromMilliseconds(1)); + } + } + CHECK_ERR(dup2(old_stdout, STDOUT_FILENO)); + PCHECK(stdout = fdopen(STDOUT_FILENO, "w")); + + faststring contents; + CHECK_OK(ReadFileToString(Env::Default(), stdout_file, &contents)); + ASSERT_STR_CONTAINS(contents.ToString(), "BEGIN STACKS"); + CHECK_ERR(unlink(stdout_file.c_str())); + ASSERT_GE(fprintf(stdout, "%s\n", contents.ToString().c_str()), 0) + << "errno=" << errno << ": " << ErrnoToString(errno); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pstack_watcher.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/pstack_watcher.cc b/be/src/kudu/util/pstack_watcher.cc new file mode 100644 index 0000000..4ba7ada --- /dev/null +++ b/be/src/kudu/util/pstack_watcher.cc @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/pstack_watcher.h" + +#include <memory> +#include <stdio.h> +#include <string> +#include <sys/types.h> +#include <unistd.h> +#include <vector> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/env.h" +#include "kudu/util/errno.h" +#include "kudu/util/status.h" +#include "kudu/util/subprocess.h" + +namespace kudu { + +using std::shared_ptr; +using std::string; +using std::vector; +using strings::Substitute; + +PstackWatcher::PstackWatcher(MonoDelta timeout) + : timeout_(std::move(timeout)), running_(true), cond_(&lock_) { + CHECK_OK(Thread::Create("pstack_watcher", "pstack_watcher", + boost::bind(&PstackWatcher::Run, this), &thread_)); +} + +PstackWatcher::~PstackWatcher() { + Shutdown(); +} + +void PstackWatcher::Shutdown() { + { + MutexLock guard(lock_); + running_ = false; + cond_.Broadcast(); + } + if (thread_) { + CHECK_OK(ThreadJoiner(thread_.get()).Join()); + thread_.reset(); + } +} + +bool PstackWatcher::IsRunning() const { + MutexLock guard(lock_); + return running_; +} + +void PstackWatcher::Wait() const { + MutexLock lock(lock_); + while (running_) { + cond_.Wait(); + } +} + +void PstackWatcher::Run() { + MutexLock guard(lock_); + if (!running_) return; + cond_.TimedWait(timeout_); + if (!running_) return; + + WARN_NOT_OK(DumpStacks(DUMP_FULL), "Unable to print pstack from watcher"); + running_ = false; + cond_.Broadcast(); +} + +Status PstackWatcher::HasProgram(const char* progname) { + Subprocess proc({ "which", progname } ); + proc.DisableStderr(); + proc.DisableStdout(); + RETURN_NOT_OK_PREPEND(proc.Start(), + Substitute("HasProgram($0): error running 'which'", progname)); + RETURN_NOT_OK(proc.Wait()); + int exit_status; + string exit_info; + RETURN_NOT_OK(proc.GetExitStatus(&exit_status, &exit_info)); + if (exit_status == 0) { + return Status::OK(); + } + return Status::NotFound(Substitute("can't find $0: $1", progname, exit_info)); +} + +Status PstackWatcher::DumpStacks(int flags) { + return DumpPidStacks(getpid(), flags); +} + +Status PstackWatcher::DumpPidStacks(pid_t pid, int flags) { + + // Prefer GDB if available; it gives us line numbers and thread names. + if (HasProgram("gdb").ok()) { + return RunGdbStackDump(pid, flags); + } + + // Otherwise, try to use pstack or gstack. + const char *progname = nullptr; + if (HasProgram("pstack").ok()) { + progname = "pstack"; + } else if (HasProgram("gstack").ok()) { + progname = "gstack"; + } + + if (!progname) { + return Status::ServiceUnavailable("Neither gdb, pstack, nor gstack appears to be installed."); + } + return RunPstack(progname, pid); +} + +Status PstackWatcher::RunGdbStackDump(pid_t pid, int flags) { + // Command: gdb -quiet -batch -nx -ex cmd1 -ex cmd2 /proc/$PID/exe $PID + vector<string> argv; + argv.push_back("gdb"); + // Don't print introductory version/copyright messages. + argv.push_back("-quiet"); + // Exit after processing all of the commands below. + argv.push_back("-batch"); + // Don't run commands from .gdbinit + argv.push_back("-nx"); + // On RHEL6 and older Ubuntu, we occasionally would see gdb spin forever + // trying to collect backtraces. Setting a backtrace limit is a reasonable + // workaround, since we don't really expect >100-deep stacks anyway. + // + // See https://bugs.launchpad.net/ubuntu/+source/gdb/+bug/434168 + argv.push_back("-ex"); + argv.push_back("set backtrace limit 100"); + argv.push_back("-ex"); + argv.push_back("set print pretty on"); + argv.push_back("-ex"); + argv.push_back("info threads"); + argv.push_back("-ex"); + argv.push_back("thread apply all bt"); + if (flags & DUMP_FULL) { + argv.push_back("-ex"); + argv.push_back("thread apply all bt full"); + } + string executable; + Env* env = Env::Default(); + RETURN_NOT_OK(env->GetExecutablePath(&executable)); + argv.push_back(executable); + argv.push_back(Substitute("$0", pid)); + return RunStackDump(argv); +} + +Status PstackWatcher::RunPstack(const std::string& progname, pid_t pid) { + string pid_string(Substitute("$0", pid)); + vector<string> argv; + argv.push_back(progname); + argv.push_back(pid_string); + return RunStackDump(argv); +} + +Status PstackWatcher::RunStackDump(const vector<string>& argv) { + printf("************************ BEGIN STACKS **************************\n"); + if (fflush(stdout) == EOF) { + return Status::IOError("Unable to flush stdout", ErrnoToString(errno), errno); + } + Subprocess pstack_proc(argv); + RETURN_NOT_OK_PREPEND(pstack_proc.Start(), "RunStackDump proc.Start() failed"); + if (::close(pstack_proc.ReleaseChildStdinFd()) == -1) { + return Status::IOError("Unable to close child stdin", ErrnoToString(errno), errno); + } + RETURN_NOT_OK_PREPEND(pstack_proc.Wait(), "RunStackDump proc.Wait() failed"); + int exit_code; + string exit_info; + RETURN_NOT_OK_PREPEND(pstack_proc.GetExitStatus(&exit_code, &exit_info), + "RunStackDump proc.GetExitStatus() failed"); + if (exit_code != 0) { + return Status::RuntimeError("RunStackDump proc.Wait() error", exit_info); + } + printf("************************* END STACKS ***************************\n"); + if (fflush(stdout) == EOF) { + return Status::IOError("Unable to flush stdout", ErrnoToString(errno), errno); + } + + return Status::OK(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pstack_watcher.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/pstack_watcher.h b/be/src/kudu/util/pstack_watcher.h new file mode 100644 index 0000000..396bf94 --- /dev/null +++ b/be/src/kudu/util/pstack_watcher.h @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_PSTACK_WATCHER_H +#define KUDU_UTIL_PSTACK_WATCHER_H + +#include <string> +#include <vector> + +#include "kudu/util/condition_variable.h" +#include "kudu/util/monotime.h" +#include "kudu/util/mutex.h" +#include "kudu/util/status.h" +#include "kudu/util/thread.h" + +namespace kudu { + +// PstackWatcher is an object which will pstack the current process and print +// the results to stdout. It does this after a certain timeout has occured. +class PstackWatcher { + public: + + enum Flags { + NO_FLAGS = 0, + + // Run 'thread apply all bt full', which is very verbose output + DUMP_FULL = 1 + }; + + // Static method to collect and write stack dump output to stdout of the current + // process. + static Status DumpStacks(int flags = NO_FLAGS); + + // Like the above but for any process, not just the current one. + static Status DumpPidStacks(pid_t pid, int flags = NO_FLAGS); + + // Instantiate a watcher that writes a pstack to stdout after the given + // timeout expires. + explicit PstackWatcher(MonoDelta timeout); + + ~PstackWatcher(); + + // Shut down the watcher and do not log a pstack. + // This method is not thread-safe. + void Shutdown(); + + // Test whether the watcher is still running or has shut down. + // Thread-safe. + bool IsRunning() const; + + // Wait until the timeout expires and the watcher logs a pstack. + // Thread-safe. + void Wait() const; + + private: + // Test for the existence of the given program in the system path. + static Status HasProgram(const char* progname); + + // Get a stack dump using GDB directly. + static Status RunGdbStackDump(pid_t pid, int flags); + + // Get a stack dump using the pstack or gstack program. + static Status RunPstack(const std::string& progname, pid_t pid); + + // Invoke and wait for the stack dump program. + static Status RunStackDump(const std::vector<std::string>& argv); + + // Run the thread that waits for the specified duration before logging a + // pstack. + void Run(); + + const MonoDelta timeout_; + bool running_; + scoped_refptr<Thread> thread_; + mutable Mutex lock_; + mutable ConditionVariable cond_; +}; + +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/random-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/random-test.cc b/be/src/kudu/util/random-test.cc new file mode 100644 index 0000000..b40e90c --- /dev/null +++ b/be/src/kudu/util/random-test.cc @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <limits> +#include <unordered_set> + +#include <glog/stl_logging.h> + +#include "kudu/util/random.h" +#include "kudu/util/test_util.h" + +namespace kudu { + +class RandomTest : public KuduTest { + public: + RandomTest() + : rng_(SeedRandom()) { + } + + protected: + Random rng_; +}; + +// Tests that after a certain number of invocations of Normal(), the +// actual mean of all samples is within the specified standard +// deviation of the target mean. +TEST_F(RandomTest, TestNormalDist) { + const double kMean = 5.0; + const double kStdDev = 0.01; + const int kNumIters = 100000; + + double sum = 0.0; + for (int i = 0; i < kNumIters; ++i) { + sum += rng_.Normal(kMean, kStdDev); + } + + ASSERT_LE(fabs((sum / static_cast<double>(kNumIters)) - kMean), kStdDev); +} + +// Tests that after a large number of invocations of Next32() and Next64(), we +// have flipped all the bits we claim we should have. +// +// This is a regression test for a bug where we were incorrectly bit-shifting +// in Next64(). +// +// Note: Our RNG actually only generates 31 bits of randomness for 32 bit +// integers. If all bits need to be randomized, callers must use Random::Next64(). +// This test reflects that, and if we change the RNG algo this test should also change. +TEST_F(RandomTest, TestUseOfBits) { + // For Next32(): + uint32_t ones32 = std::numeric_limits<uint32_t>::max(); + uint32_t zeroes32 = 0; + // For Next64(): + uint64_t ones64 = std::numeric_limits<uint64_t>::max(); + uint64_t zeroes64 = 0; + + for (int i = 0; i < 10000000; i++) { + uint32_t r32 = rng_.Next32(); + ones32 &= r32; + zeroes32 |= r32; + + uint64_t r64 = rng_.Next64(); + ones64 &= r64; + zeroes64 |= r64; + } + + // At the end, we should have flipped 31 and 64 bits, respectively. One + // detail of the current RNG impl is that Next32() always returns a number + // with MSB set to 0. + uint32_t expected_bits_31 = std::numeric_limits<uint32_t>::max() >> 1; + uint64_t expected_bits_64 = std::numeric_limits<uint64_t>::max(); + + ASSERT_EQ(0, ones32); + ASSERT_EQ(expected_bits_31, zeroes32); + ASSERT_EQ(0, ones64); + ASSERT_EQ(expected_bits_64, zeroes64); +} + +TEST_F(RandomTest, TestResetSeed) { + rng_.Reset(1); + uint64_t first = rng_.Next64(); + rng_.Reset(1); + uint64_t second = rng_.Next64(); + ASSERT_EQ(first, second); +} + +TEST_F(RandomTest, TestReservoirSample) { + // Use a constant seed to avoid flakiness. + rng_.Reset(12345); + + vector<int> population; + for (int i = 0; i < 100; i++) { + population.push_back(i); + } + + // Run 1000 trials selecting 5 elements. + vector<int> results; + vector<int> counts(population.size()); + std::unordered_set<int> avoid; + for (int trial = 0; trial < 1000; trial++) { + rng_.ReservoirSample(population, 5, avoid, &results); + for (int result : results) { + counts[result]++; + } + } + + // We expect each element to be selected + // 50 times on average, but since it's random, it won't be exact. + // However, since we use a constant seed, this test won't be flaky. + for (int count : counts) { + ASSERT_GE(count, 25); + ASSERT_LE(count, 75); + } + + // Run again, but avoid some particular entries. + avoid.insert(3); + avoid.insert(10); + avoid.insert(20); + counts.assign(100, 0); + for (int trial = 0; trial < 1000; trial++) { + rng_.ReservoirSample(population, 5, avoid, &results); + for (int result : results) { + counts[result]++; + } + } + + // Ensure that we didn't ever pick the avoided elements. + ASSERT_EQ(0, counts[3]); + ASSERT_EQ(0, counts[10]); + ASSERT_EQ(0, counts[20]); +} + +TEST_F(RandomTest, TestReservoirSamplePopulationTooSmall) { + vector<int> population; + for (int i = 0; i < 10; i++) { + population.push_back(i); + } + + vector<int> results; + std::unordered_set<int> avoid; + rng_.ReservoirSample(population, 20, avoid, &results); + ASSERT_EQ(population.size(), results.size()); + ASSERT_EQ(population, results); + + rng_.ReservoirSample(population, 10, avoid, &results); + ASSERT_EQ(population.size(), results.size()); + ASSERT_EQ(population, results); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/random.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/random.h b/be/src/kudu/util/random.h new file mode 100644 index 0000000..e31e475 --- /dev/null +++ b/be/src/kudu/util/random.h @@ -0,0 +1,252 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#ifndef KUDU_UTIL_RANDOM_H_ +#define KUDU_UTIL_RANDOM_H_ + +#include <cmath> +#include <cstdint> +#include <mutex> +#include <random> +#include <vector> + +#include "kudu/gutil/casts.h" +#include "kudu/gutil/map-util.h" +#include "kudu/util/locks.h" + +namespace kudu { + +namespace random_internal { + +static const uint32_t M = 2147483647L; // 2^31-1 + +} // namespace random_internal + +template<class R> +class StdUniformRNG; + +// A very simple random number generator. Not especially good at +// generating truly random bits, but good enough for our needs in this +// package. This implementation is not thread-safe. +class Random { + private: + uint32_t seed_; + public: + explicit Random(uint32_t s) { + Reset(s); + } + + // Reset the RNG to the given seed value. + void Reset(uint32_t s) { + seed_ = s & 0x7fffffffu; + // Avoid bad seeds. + if (seed_ == 0 || seed_ == random_internal::M) { + seed_ = 1; + } + } + + // Next pseudo-random 32-bit unsigned integer. + // FIXME: This currently only generates 31 bits of randomness. + // The MSB will always be zero. + uint32_t Next() { + static const uint64_t A = 16807; // bits 14, 8, 7, 5, 2, 1, 0 + // We are computing + // seed_ = (seed_ * A) % M, where M = 2^31-1 + // + // seed_ must not be zero or M, or else all subsequent computed values + // will be zero or M respectively. For all other values, seed_ will end + // up cycling through every number in [1,M-1] + uint64_t product = seed_ * A; + + // Compute (product % M) using the fact that ((x << 31) % M) == x. + seed_ = static_cast<uint32_t>((product >> 31) + (product & random_internal::M)); + // The first reduction may overflow by 1 bit, so we may need to + // repeat. mod == M is not possible; using > allows the faster + // sign-bit-based test. + if (seed_ > random_internal::M) { + seed_ -= random_internal::M; + } + return seed_; + } + + // Alias for consistency with Next64 + uint32_t Next32() { return Next(); } + + // Next pseudo-random 64-bit unsigned integer. + uint64_t Next64() { + uint64_t large = Next(); + large <<= 31; + large |= Next(); + // Fill in the highest two MSBs. + large |= implicit_cast<uint64_t>(Next32()) << 62; + return large; + } + + // Returns a uniformly distributed value in the range [0..n-1] + // REQUIRES: n > 0 + uint32_t Uniform(uint32_t n) { return Next() % n; } + + // Alias for consistency with Uniform64 + uint32_t Uniform32(uint32_t n) { return Uniform(n); } + + // Returns a uniformly distributed 64-bit value in the range [0..n-1] + // REQUIRES: n > 0 + uint64_t Uniform64(uint64_t n) { return Next64() % n; } + + // Randomly returns true ~"1/n" of the time, and false otherwise. + // REQUIRES: n > 0 + bool OneIn(int n) { return (Next() % n) == 0; } + + // Skewed: pick "base" uniformly from range [0,max_log] and then + // return "base" random bits. The effect is to pick a number in the + // range [0,2^max_log-1] with exponential bias towards smaller numbers. + uint32_t Skewed(int max_log) { + return Uniform(1 << Uniform(max_log + 1)); + } + + // Samples a random number from the given normal distribution. + double Normal(double mean, double std_dev); + + // Return a random number between 0.0 and 1.0 inclusive. + double NextDoubleFraction() { + return Next() / static_cast<double>(random_internal::M + 1.0); + } + + // Sample 'k' random elements from the collection 'c' into 'result', taking care not to sample any + // elements that are already present in 'avoid'. + // + // In the case that 'c' has fewer than 'k' elements then all elements in 'c' will be selected. + // + // 'c' should be an iterable STL collection such as a vector, set, or list. + // 'avoid' should be an STL-compatible set. + // + // The results are not stored in a randomized order: the order of results will + // match their order in the input collection. + template<class Collection, class Set, class T> + void ReservoirSample(const Collection& c, int k, const Set& avoid, + std::vector<T>* result) { + result->clear(); + result->reserve(k); + int i = 0; + for (const T& elem : c) { + if (ContainsKey(avoid, elem)) { + continue; + } + i++; + // Fill the reservoir if there is available space. + if (result->size() < k) { + result->push_back(elem); + continue; + } + // Otherwise replace existing elements with decreasing probability. + int j = Uniform(i); + if (j < k) { + (*result)[j] = elem; + } + } + } +}; + +// Thread-safe wrapper around Random. +class ThreadSafeRandom { + public: + explicit ThreadSafeRandom(uint32_t s) + : random_(s) { + } + + void Reset(uint32_t s) { + std::lock_guard<simple_spinlock> l(lock_); + random_.Reset(s); + } + + uint32_t Next() { + std::lock_guard<simple_spinlock> l(lock_); + return random_.Next(); + } + + uint32_t Next32() { + std::lock_guard<simple_spinlock> l(lock_); + return random_.Next32(); + } + + uint64_t Next64() { + std::lock_guard<simple_spinlock> l(lock_); + return random_.Next64(); + } + + uint32_t Uniform(uint32_t n) { + std::lock_guard<simple_spinlock> l(lock_); + return random_.Uniform(n); + } + + uint32_t Uniform32(uint32_t n) { + std::lock_guard<simple_spinlock> l(lock_); + return random_.Uniform32(n); + } + + uint64_t Uniform64(uint64_t n) { + std::lock_guard<simple_spinlock> l(lock_); + return random_.Uniform64(n); + } + + bool OneIn(int n) { + std::lock_guard<simple_spinlock> l(lock_); + return random_.OneIn(n); + } + + uint32_t Skewed(int max_log) { + std::lock_guard<simple_spinlock> l(lock_); + return random_.Skewed(max_log); + } + + double Normal(double mean, double std_dev) { + std::lock_guard<simple_spinlock> l(lock_); + return random_.Normal(mean, std_dev); + } + + double NextDoubleFraction() { + std::lock_guard<simple_spinlock> l(lock_); + return random_.NextDoubleFraction(); + } + + template<class Collection, class Set, class T> + void ReservoirSample(const Collection& c, int k, const Set& avoid, + std::vector<T>* result) { + std::lock_guard<simple_spinlock> l(lock_); + random_.ReservoirSample(c, k, avoid, result); + } + + private: + simple_spinlock lock_; + Random random_; +}; + +// Wraps either Random or ThreadSafeRandom as a C++ standard library +// compliant UniformRandomNumberGenerator: +// http://en.cppreference.com/w/cpp/concept/UniformRandomNumberGenerator +template<class R> +class StdUniformRNG { + public: + typedef uint32_t result_type; + + explicit StdUniformRNG(R* r) : r_(r) {} + uint32_t operator()() { + return r_->Next32(); + } + constexpr static uint32_t min() { return 0; } + constexpr static uint32_t max() { return (1L << 31) - 1; } + + private: + R* r_; +}; + +// Defined outside the class to make use of StdUniformRNG above. +inline double Random::Normal(double mean, double std_dev) { + std::normal_distribution<> nd(mean, std_dev); + StdUniformRNG<Random> gen(this); + return nd(gen); +} + +} // namespace kudu + +#endif // KUDU_UTIL_RANDOM_H_ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/random_util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/random_util-test.cc b/be/src/kudu/util/random_util-test.cc new file mode 100644 index 0000000..f3eb7d5 --- /dev/null +++ b/be/src/kudu/util/random_util-test.cc @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/random_util.h" + +#include <algorithm> +#include <cmath> +#include <cstring> + +#include "kudu/util/random.h" +#include "kudu/util/test_util.h" + +namespace kudu { + +class RandomUtilTest : public KuduTest { + protected: + RandomUtilTest() : rng_(SeedRandom()) {} + + Random rng_; + + static const int kLenMax = 100; + static const int kNumTrials = 100; +}; + +namespace { + +// Checks string defined at start is set to \0 everywhere but [from, to) +void CheckEmpty(char* start, int from, int to, int stop) { + DCHECK_LE(0, from); + DCHECK_LE(from, to); + DCHECK_LE(to, stop); + for (int j = 0; (j == from ? j = to : j) < stop; ++j) { + CHECK_EQ(start[j], '\0') << "Index " << j << " not null after defining" + << "indices [" << from << "," << to << ") of " + << "a nulled string [0," << stop << ")."; + } +} + +} // anonymous namespace + +// Makes sure that RandomString only writes the specified amount +TEST_F(RandomUtilTest, TestRandomString) { + char start[kLenMax]; + + for (int i = 0; i < kNumTrials; ++i) { + memset(start, '\0', kLenMax); + int to = rng_.Uniform(kLenMax + 1); + int from = rng_.Uniform(to + 1); + RandomString(start + from, to - from, &rng_); + CheckEmpty(start, from, to, kLenMax); + } + + // Corner case + memset(start, '\0', kLenMax); + RandomString(start, 0, &rng_); + CheckEmpty(start, 0, 0, kLenMax); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/random_util.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/random_util.cc b/be/src/kudu/util/random_util.cc new file mode 100644 index 0000000..21a4144 --- /dev/null +++ b/be/src/kudu/util/random_util.cc @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/random_util.h" + +#include <cmath> +#include <cstdlib> +#include <cstring> +#include <sys/types.h> +#include <unistd.h> + +#include "kudu/util/env.h" +#include "kudu/util/random.h" +#include "kudu/gutil/walltime.h" + +namespace kudu { + +void RandomString(void* dest, size_t n, Random* rng) { + size_t i = 0; + uint32_t random = rng->Next(); + char* cdest = static_cast<char*>(dest); + static const size_t sz = sizeof(random); + if (n >= sz) { + for (i = 0; i <= n - sz; i += sz) { + memcpy(&cdest[i], &random, sizeof(random)); + random = rng->Next(); + } + } + memcpy(cdest + i, &random, n - i); +} + +uint32_t GetRandomSeed32() { + uint32_t seed = static_cast<uint32_t>(GetCurrentTimeMicros()); + seed *= getpid(); + seed *= Env::Default()->gettid(); + return seed; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/random_util.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/random_util.h b/be/src/kudu/util/random_util.h new file mode 100644 index 0000000..e286bbe --- /dev/null +++ b/be/src/kudu/util/random_util.h @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_UTIL_RANDOM_UTIL_H +#define KUDU_UTIL_RANDOM_UTIL_H + +#include <cstdlib> +#include <stdint.h> + +namespace kudu { + +class Random; + +// Writes exactly n random bytes to dest using the parameter Random generator. +// Note RandomString() does not null-terminate its strings, though '\0' could +// be written to dest with the same probability as any other byte. +void RandomString(void* dest, size_t n, Random* rng); + +// Generate a 32-bit random seed from several sources, including timestamp, +// pid & tid. +uint32_t GetRandomSeed32(); + +} // namespace kudu + +#endif // KUDU_UTIL_RANDOM_UTIL_H http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/resettable_heartbeater-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/resettable_heartbeater-test.cc b/be/src/kudu/util/resettable_heartbeater-test.cc new file mode 100644 index 0000000..0e4cf3b --- /dev/null +++ b/be/src/kudu/util/resettable_heartbeater-test.cc @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/resettable_heartbeater.h" + +#include <boost/bind.hpp> +#include <gtest/gtest.h> +#include <string> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/locks.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status.h" +#include "kudu/util/test_util.h" + +namespace kudu { + +// Number of heartbeats we want to observe before allowing the test to end. +static const int kNumHeartbeats = 2; + +class ResettableHeartbeaterTest : public KuduTest { + public: + ResettableHeartbeaterTest() + : KuduTest(), + latch_(kNumHeartbeats) { + } + + protected: + void CreateHeartbeater(uint64_t period_ms, const std::string& name) { + period_ms_ = period_ms; + heartbeater_.reset( + new ResettableHeartbeater(name, + MonoDelta::FromMilliseconds(period_ms), + boost::bind(&ResettableHeartbeaterTest::HeartbeatFunction, + this))); + } + + Status HeartbeatFunction() { + latch_.CountDown(); + return Status::OK(); + } + + void WaitForCountDown() { + // Wait a large multiple (in the worst case) of the required time before we + // time out and fail the test. Large to avoid test flakiness. + const uint64_t kMaxWaitMillis = period_ms_ * kNumHeartbeats * 20; + CHECK(latch_.WaitFor(MonoDelta::FromMilliseconds(kMaxWaitMillis))) + << "Failed to count down " << kNumHeartbeats << " times in " << kMaxWaitMillis + << " ms: latch count == " << latch_.count(); + } + + CountDownLatch latch_; + uint64_t period_ms_; + gscoped_ptr<ResettableHeartbeater> heartbeater_; +}; + +// Tests that if Reset() is not called the heartbeat method is called +// the expected number of times. +TEST_F(ResettableHeartbeaterTest, TestRegularHeartbeats) { + const int64_t kHeartbeatPeriodMillis = 100; // Heartbeat every 100ms. + CreateHeartbeater(kHeartbeatPeriodMillis, CURRENT_TEST_NAME()); + ASSERT_OK(heartbeater_->Start()); + WaitForCountDown(); + ASSERT_OK(heartbeater_->Stop()); +} + +// Tests that if we Reset() the heartbeater in a period smaller than +// the heartbeat period the heartbeat method never gets called. +// After we stop resetting heartbeats should resume as normal +TEST_F(ResettableHeartbeaterTest, TestResetHeartbeats) { + const int64_t kHeartbeatPeriodMillis = 800; // Heartbeat every 800ms. + const int64_t kNumResetSlicesPerPeriod = 40; // Reset 40 times per heartbeat period. + // Reset once every 800ms / 40 = 20ms. + const int64_t kResetPeriodMillis = kHeartbeatPeriodMillis / kNumResetSlicesPerPeriod; + + CreateHeartbeater(kHeartbeatPeriodMillis, CURRENT_TEST_NAME()); + ASSERT_OK(heartbeater_->Start()); + // Call Reset() in a loop for 2 heartbeat periods' worth of time, with sleeps + // in-between as defined above. + for (int i = 0; i < kNumResetSlicesPerPeriod * 2; i++) { + heartbeater_->Reset(); + ASSERT_EQ(kNumHeartbeats, latch_.count()); // Ensure we haven't counted down, yet. + SleepFor(MonoDelta::FromMilliseconds(kResetPeriodMillis)); + } + WaitForCountDown(); + ASSERT_OK(heartbeater_->Stop()); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/resettable_heartbeater.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/resettable_heartbeater.cc b/be/src/kudu/util/resettable_heartbeater.cc new file mode 100644 index 0000000..91c4587 --- /dev/null +++ b/be/src/kudu/util/resettable_heartbeater.cc @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/resettable_heartbeater.h" + +#include <glog/logging.h> +#include <mutex> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/locks.h" +#include "kudu/util/random.h" +#include "kudu/util/status.h" +#include "kudu/util/thread.h" + +namespace kudu { +using std::string; + +class ResettableHeartbeaterThread { + public: + ResettableHeartbeaterThread(std::string name, MonoDelta period, + HeartbeatFunction function); + + Status Start(); + Status Stop(); + void Reset(); + + private: + void RunThread(); + bool IsCurrentThread() const; + + const string name_; + + // The heartbeat period. + const MonoDelta period_; + + // The function to call to perform the heartbeat + const HeartbeatFunction function_; + + // The actual running thread (NULL before it is started) + scoped_refptr<kudu::Thread> thread_; + + CountDownLatch run_latch_; + + // Whether the heartbeater should shutdown. + bool shutdown_; + + // lock that protects access to 'shutdown_' and to 'run_latch_' + // Reset() method. + mutable simple_spinlock lock_; + DISALLOW_COPY_AND_ASSIGN(ResettableHeartbeaterThread); +}; + +ResettableHeartbeater::ResettableHeartbeater(const std::string& name, + MonoDelta period, + HeartbeatFunction function) + : thread_(new ResettableHeartbeaterThread(name, period, function)) { +} + +Status ResettableHeartbeater::Start() { + return thread_->Start(); +} + +Status ResettableHeartbeater::Stop() { + return thread_->Stop(); +} +void ResettableHeartbeater::Reset() { + thread_->Reset(); +} + +ResettableHeartbeater::~ResettableHeartbeater() { + WARN_NOT_OK(Stop(), "Unable to stop heartbeater thread"); +} + +ResettableHeartbeaterThread::ResettableHeartbeaterThread( + std::string name, MonoDelta period, HeartbeatFunction function) + : name_(std::move(name)), + period_(std::move(period)), + function_(std::move(function)), + run_latch_(0), + shutdown_(false) {} + +void ResettableHeartbeaterThread::RunThread() { + CHECK(IsCurrentThread()); + VLOG(1) << "Heartbeater: " << name_ << " thread starting"; + + bool prev_reset_was_manual = false; + Random rng(random()); + while (true) { + MonoDelta wait_period = period_; + if (prev_reset_was_manual) { + // When the caller does a manual reset, we randomize the subsequent wait + // timeout between period_/2 and period_. This builds in some jitter so + // multiple tablets on the same TS don't end up heartbeating in lockstep. + int64_t half_period_ms = period_.ToMilliseconds() / 2; + wait_period = MonoDelta::FromMilliseconds( + half_period_ms + + rng.NextDoubleFraction() * half_period_ms); + prev_reset_was_manual = false; + } + if (run_latch_.WaitFor(wait_period)) { + // CountDownLatch reached 0 -- this means there was a manual reset. + prev_reset_was_manual = true; + std::lock_guard<simple_spinlock> lock(lock_); + // check if we were told to shutdown + if (shutdown_) { + // Latch fired -- exit loop + VLOG(1) << "Heartbeater: " << name_ << " thread finished"; + return; + } else { + // otherwise it's just a reset, reset the latch + // and continue; + run_latch_.Reset(1); + continue; + } + } + + Status s = function_(); + if (!s.ok()) { + LOG(WARNING)<< "Failed to heartbeat in heartbeater: " << name_ + << " Status: " << s.ToString(); + continue; + } + } +} + +bool ResettableHeartbeaterThread::IsCurrentThread() const { + return thread_.get() == kudu::Thread::current_thread(); +} + +Status ResettableHeartbeaterThread::Start() { + CHECK(thread_ == nullptr); + run_latch_.Reset(1); + return kudu::Thread::Create("heartbeater", strings::Substitute("$0-heartbeat", name_), + &ResettableHeartbeaterThread::RunThread, + this, &thread_); +} + +void ResettableHeartbeaterThread::Reset() { + if (!thread_) { + return; + } + run_latch_.CountDown(); +} + +Status ResettableHeartbeaterThread::Stop() { + if (!thread_) { + return Status::OK(); + } + + { + std::lock_guard<simple_spinlock> l(lock_); + if (shutdown_) { + return Status::OK(); + } + shutdown_ = true; + } + + run_latch_.CountDown(); + RETURN_NOT_OK(ThreadJoiner(thread_.get()).Join()); + return Status::OK(); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/resettable_heartbeater.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/resettable_heartbeater.h b/be/src/kudu/util/resettable_heartbeater.h new file mode 100644 index 0000000..40bbe29 --- /dev/null +++ b/be/src/kudu/util/resettable_heartbeater.h @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_UTIL_RESETTABLE_HEARTBEATER_H_ +#define KUDU_UTIL_RESETTABLE_HEARTBEATER_H_ + +#include <boost/function.hpp> +#include <string> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/macros.h" + +namespace kudu { +class MonoDelta; +class Status; +class ResettableHeartbeaterThread; + +typedef boost::function<Status()> HeartbeatFunction; + +// A resettable hearbeater that takes a function and calls +// it to perform a regular heartbeat, unless Reset() is called +// in which case the heartbeater resets the heartbeat period. +// The point is to send "I'm Alive" heartbeats only if no regular +// messages are sent in the same period. +// +// TODO Eventually this should be used instead of the master heartbeater +// as it shares a lot of logic with the exception of the specific master +// stuff (and the fact that it is resettable). +// +// TODO We'll have a lot of these per server, so eventually we need +// to refactor this so that multiple heartbeaters share something like +// java's ScheduledExecutor. +// +// TODO Do something about failed hearbeats, right now this is just +// logging. Probably could take more arguments and do more of an +// exponential backoff. +// +// This class is thread safe. +class ResettableHeartbeater { + public: + ResettableHeartbeater(const std::string& name, + MonoDelta period, + HeartbeatFunction function); + + // Starts the heartbeater + Status Start(); + + // Stops the hearbeater + Status Stop(); + + // Resets the heartbeat period. + // When this is called, the subsequent heartbeat has some built-in jitter and + // may trigger before a full period (as specified to the constructor). + void Reset(); + + ~ResettableHeartbeater(); + private: + gscoped_ptr<ResettableHeartbeaterThread> thread_; + + DISALLOW_COPY_AND_ASSIGN(ResettableHeartbeater); +}; + +} // namespace kudu + +#endif /* KUDU_UTIL_RESETTABLE_HEARTBEATER_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rle-encoding.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/rle-encoding.h b/be/src/kudu/util/rle-encoding.h new file mode 100644 index 0000000..120a159 --- /dev/null +++ b/be/src/kudu/util/rle-encoding.h @@ -0,0 +1,523 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef IMPALA_RLE_ENCODING_H +#define IMPALA_RLE_ENCODING_H + +#include <glog/logging.h> + +#include "kudu/gutil/port.h" +#include "kudu/util/bit-stream-utils.inline.h" +#include "kudu/util/bit-util.h" + +namespace kudu { + +// Utility classes to do run length encoding (RLE) for fixed bit width values. If runs +// are sufficiently long, RLE is used, otherwise, the values are just bit-packed +// (literal encoding). +// For both types of runs, there is a byte-aligned indicator which encodes the length +// of the run and the type of the run. +// This encoding has the benefit that when there aren't any long enough runs, values +// are always decoded at fixed (can be precomputed) bit offsets OR both the value and +// the run length are byte aligned. This allows for very efficient decoding +// implementations. +// The encoding is: +// encoded-block := run* +// run := literal-run | repeated-run +// literal-run := literal-indicator < literal bytes > +// repeated-run := repeated-indicator < repeated value. padded to byte boundary > +// literal-indicator := varint_encode( number_of_groups << 1 | 1) +// repeated-indicator := varint_encode( number_of_repetitions << 1 ) +// +// Each run is preceded by a varint. The varint's least significant bit is +// used to indicate whether the run is a literal run or a repeated run. The rest +// of the varint is used to determine the length of the run (eg how many times the +// value repeats). +// +// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode +// in groups of 8), so that no matter the bit-width of the value, the sequence will end +// on a byte boundary without padding. +// Given that we know it is a multiple of 8, we store the number of 8-groups rather than +// the actual number of encoded ints. (This means that the total number of encoded values +// can not be determined from the encoded data, since the number of values in the last +// group may not be a multiple of 8). +// There is a break-even point when it is more storage efficient to do run length +// encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes +// for both the repeated encoding or the literal encoding. This value can always +// be computed based on the bit-width. +// TODO: think about how to use this for strings. The bit packing isn't quite the same. +// +// Examples with bit-width 1 (eg encoding booleans): +// ---------------------------------------- +// 100 1s followed by 100 0s: +// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte> +// - (total 4 bytes) +// +// alternating 1s and 0s (200 total): +// 200 ints = 25 groups of 8 +// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked> +// (total 26 bytes, 1 byte overhead) +// + +// Decoder class for RLE encoded data. +// +// NOTE: the encoded format does not have any length prefix or any other way of +// indicating that the encoded sequence ends at a certain point, so the Decoder +// methods may return some extra bits at the end before the read methods start +// to return 0/false. +template<typename T> +class RleDecoder { + public: + // Create a decoder object. buffer/buffer_len is the decoded data. + // bit_width is the width of each value (before encoding). + RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width) + : bit_reader_(buffer, buffer_len), + bit_width_(bit_width), + current_value_(0), + repeat_count_(0), + literal_count_(0), + rewind_state_(CANT_REWIND) { + DCHECK_GE(bit_width_, 1); + DCHECK_LE(bit_width_, 64); + } + + RleDecoder() {} + + // Skip n values, and returns the number of non-zero entries skipped. + size_t Skip(size_t to_skip); + + // Gets the next value. Returns false if there are no more. + bool Get(T* val); + + // Seek to the previous value. + void RewindOne(); + + // Gets the next run of the same 'val'. Returns 0 if there is no + // more data to be decoded. Will return a run of at most 'max_run' + // values. If there are more values than this, the next call to + // GetNextRun will return more from the same run. + size_t GetNextRun(T* val, size_t max_run); + + private: + bool ReadHeader(); + + enum RewindState { + REWIND_LITERAL, + REWIND_RUN, + CANT_REWIND + }; + + BitReader bit_reader_; + int bit_width_; + uint64_t current_value_; + uint32_t repeat_count_; + uint32_t literal_count_; + RewindState rewind_state_; +}; + +// Class to incrementally build the rle data. +// The encoding has two modes: encoding repeated runs and literal runs. +// If the run is sufficiently short, it is more efficient to encode as a literal run. +// This class does so by buffering 8 values at a time. If they are not all the same +// they are added to the literal run. If they are the same, they are added to the +// repeated run. When we switch modes, the previous run is flushed out. +template<typename T> +class RleEncoder { + public: + // buffer: buffer to write bits to. + // bit_width: max number of bits for value. + // TODO: consider adding a min_repeated_run_length so the caller can control + // when values should be encoded as repeated runs. Currently this is derived + // based on the bit_width, which can determine a storage optimal choice. + explicit RleEncoder(faststring *buffer, int bit_width) + : bit_width_(bit_width), + bit_writer_(buffer) { + DCHECK_GE(bit_width_, 1); + DCHECK_LE(bit_width_, 64); + Clear(); + } + + // Reserve 'num_bytes' bytes for a plain encoded header, set each + // byte with 'val': this is used for the RLE-encoded data blocks in + // order to be able to able to store the initial ordinal position + // and number of elements. This is a part of RleEncoder in order to + // maintain the correct offset in 'buffer'. + void Reserve(int num_bytes, uint8_t val); + + // Encode value. This value must be representable with bit_width_ bits. + void Put(T value, size_t run_length = 1); + + // Flushes any pending values to the underlying buffer. + // Returns the total number of bytes written + int Flush(); + + // Resets all the state in the encoder. + void Clear(); + + int32_t len() const { return bit_writer_.bytes_written(); } + + private: + // Flushes any buffered values. If this is part of a repeated run, this is largely + // a no-op. + // If it is part of a literal run, this will call FlushLiteralRun, which writes + // out the buffered literal values. + // If 'done' is true, the current run would be written even if it would normally + // have been buffered more. This should only be called at the end, when the + // encoder has received all values even if it would normally continue to be + // buffered. + void FlushBufferedValues(bool done); + + // Flushes literal values to the underlying buffer. If update_indicator_byte, + // then the current literal run is complete and the indicator byte is updated. + void FlushLiteralRun(bool update_indicator_byte); + + // Flushes a repeated run to the underlying buffer. + void FlushRepeatedRun(); + + // Number of bits needed to encode the value. + const int bit_width_; + + // Underlying buffer. + BitWriter bit_writer_; + + // We need to buffer at most 8 values for literals. This happens when the + // bit_width is 1 (so 8 values fit in one byte). + // TODO: generalize this to other bit widths + uint64_t buffered_values_[8]; + + // Number of values in buffered_values_ + int num_buffered_values_; + + // The current (also last) value that was written and the count of how + // many times in a row that value has been seen. This is maintained even + // if we are in a literal run. If the repeat_count_ get high enough, we switch + // to encoding repeated runs. + uint64_t current_value_; + int repeat_count_; + + // Number of literals in the current run. This does not include the literals + // that might be in buffered_values_. Only after we've got a group big enough + // can we decide if they should part of the literal_count_ or repeat_count_ + int literal_count_; + + // Index of a byte in the underlying buffer that stores the indicator byte. + // This is reserved as soon as we need a literal run but the value is written + // when the literal run is complete. We maintain an index rather than a pointer + // into the underlying buffer because the pointer value may become invalid if + // the underlying buffer is resized. + int literal_indicator_byte_idx_; +}; + +template<typename T> +inline bool RleDecoder<T>::ReadHeader() { + DCHECK(bit_reader_.is_initialized()); + if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) { + // Read the next run's indicator int, it could be a literal or repeated run + // The int is encoded as a vlq-encoded value. + int32_t indicator_value = 0; + bool result = bit_reader_.GetVlqInt(&indicator_value); + if (PREDICT_FALSE(!result)) { + return false; + } + + // lsb indicates if it is a literal run or repeated run + bool is_literal = indicator_value & 1; + if (is_literal) { + literal_count_ = (indicator_value >> 1) * 8; + DCHECK_GT(literal_count_, 0); + } else { + repeat_count_ = indicator_value >> 1; + DCHECK_GT(repeat_count_, 0); + bool result = bit_reader_.GetAligned<T>( + BitUtil::Ceil(bit_width_, 8), reinterpret_cast<T*>(¤t_value_)); + DCHECK(result); + } + } + return true; +} + +template<typename T> +inline bool RleDecoder<T>::Get(T* val) { + DCHECK(bit_reader_.is_initialized()); + if (PREDICT_FALSE(!ReadHeader())) { + return false; + } + + if (PREDICT_TRUE(repeat_count_ > 0)) { + *val = current_value_; + --repeat_count_; + rewind_state_ = REWIND_RUN; + } else { + DCHECK(literal_count_ > 0); + bool result = bit_reader_.GetValue(bit_width_, val); + DCHECK(result); + --literal_count_; + rewind_state_ = REWIND_LITERAL; + } + + return true; +} + +template<typename T> +inline void RleDecoder<T>::RewindOne() { + DCHECK(bit_reader_.is_initialized()); + + switch (rewind_state_) { + case CANT_REWIND: + LOG(FATAL) << "Can't rewind more than once after each read!"; + break; + case REWIND_RUN: + ++repeat_count_; + break; + case REWIND_LITERAL: + { + bit_reader_.Rewind(bit_width_); + ++literal_count_; + break; + } + } + + rewind_state_ = CANT_REWIND; +} + +template<typename T> +inline size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) { + DCHECK(bit_reader_.is_initialized()); + DCHECK_GT(max_run, 0); + size_t ret = 0; + size_t rem = max_run; + while (ReadHeader()) { + if (PREDICT_TRUE(repeat_count_ > 0)) { + if (PREDICT_FALSE(ret > 0 && *val != current_value_)) { + return ret; + } + *val = current_value_; + if (repeat_count_ >= rem) { + // The next run is longer than the amount of remaining data + // that the caller wants to read. Only consume it partially. + repeat_count_ -= rem; + ret += rem; + return ret; + } + ret += repeat_count_; + rem -= repeat_count_; + repeat_count_ = 0; + } else { + DCHECK(literal_count_ > 0); + if (ret == 0) { + bool has_more = bit_reader_.GetValue(bit_width_, val); + DCHECK(has_more); + literal_count_--; + ret++; + rem--; + } + + while (literal_count_ > 0) { + bool result = bit_reader_.GetValue(bit_width_, ¤t_value_); + DCHECK(result); + if (current_value_ != *val || rem == 0) { + bit_reader_.Rewind(bit_width_); + return ret; + } + ret++; + rem--; + literal_count_--; + } + } + } + return ret; + } + +template<typename T> +inline size_t RleDecoder<T>::Skip(size_t to_skip) { + DCHECK(bit_reader_.is_initialized()); + + size_t set_count = 0; + while (to_skip > 0) { + bool result = ReadHeader(); + DCHECK(result); + + if (PREDICT_TRUE(repeat_count_ > 0)) { + size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip; + repeat_count_ -= nskip; + to_skip -= nskip; + if (current_value_ != 0) { + set_count += nskip; + } + } else { + DCHECK(literal_count_ > 0); + size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip; + literal_count_ -= nskip; + to_skip -= nskip; + while (nskip--) { + T value = 0; + bool result = bit_reader_.GetValue(bit_width_, &value); + DCHECK(result); + if (value != 0) { + set_count++; + } + } + } + } + return set_count; +} + +// This function buffers input values 8 at a time. After seeing all 8 values, +// it decides whether they should be encoded as a literal or repeated run. +template<typename T> +inline void RleEncoder<T>::Put(T value, size_t run_length) { + DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); + + // TODO(perf): remove the loop and use the repeat_count_ + while (run_length--) { + if (PREDICT_TRUE(current_value_ == value)) { + ++repeat_count_; + if (repeat_count_ > 8) { + // This is just a continuation of the current run, no need to buffer the + // values. + // Note that this is the fast path for long repeated runs. + continue; + } + } else { + if (repeat_count_ >= 8) { + // We had a run that was long enough but it has ended. Flush the + // current repeated run. + DCHECK_EQ(literal_count_, 0); + FlushRepeatedRun(); + } + repeat_count_ = 1; + current_value_ = value; + } + + buffered_values_[num_buffered_values_] = value; + if (++num_buffered_values_ == 8) { + DCHECK_EQ(literal_count_ % 8, 0); + FlushBufferedValues(false); + } + } +} + +template<typename T> +inline void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) { + if (literal_indicator_byte_idx_ < 0) { + // The literal indicator byte has not been reserved yet, get one now. + literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1); + DCHECK_GE(literal_indicator_byte_idx_, 0); + } + + // Write all the buffered values as bit packed literals + for (int i = 0; i < num_buffered_values_; ++i) { + bit_writer_.PutValue(buffered_values_[i], bit_width_); + } + num_buffered_values_ = 0; + + if (update_indicator_byte) { + // At this point we need to write the indicator byte for the literal run. + // We only reserve one byte, to allow for streaming writes of literal values. + // The logic makes sure we flush literal runs often enough to not overrun + // the 1 byte. + int num_groups = BitUtil::Ceil(literal_count_, 8); + int32_t indicator_value = (num_groups << 1) | 1; + DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); + bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value; + literal_indicator_byte_idx_ = -1; + literal_count_ = 0; + } +} + +template<typename T> +inline void RleEncoder<T>::FlushRepeatedRun() { + DCHECK_GT(repeat_count_, 0); + // The lsb of 0 indicates this is a repeated run + int32_t indicator_value = repeat_count_ << 1 | 0; + bit_writer_.PutVlqInt(indicator_value); + bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); + num_buffered_values_ = 0; + repeat_count_ = 0; +} + +// Flush the values that have been buffered. At this point we decide whether +// we need to switch between the run types or continue the current one. +template<typename T> +inline void RleEncoder<T>::FlushBufferedValues(bool done) { + if (repeat_count_ >= 8) { + // Clear the buffered values. They are part of the repeated run now and we + // don't want to flush them out as literals. + num_buffered_values_ = 0; + if (literal_count_ != 0) { + // There was a current literal run. All the values in it have been flushed + // but we still need to update the indicator byte. + DCHECK_EQ(literal_count_ % 8, 0); + DCHECK_EQ(repeat_count_, 8); + FlushLiteralRun(true); + } + DCHECK_EQ(literal_count_, 0); + return; + } + + literal_count_ += num_buffered_values_; + int num_groups = BitUtil::Ceil(literal_count_, 8); + if (num_groups + 1 >= (1 << 6)) { + // We need to start a new literal run because the indicator byte we've reserved + // cannot store more values. + DCHECK_GE(literal_indicator_byte_idx_, 0); + FlushLiteralRun(true); + } else { + FlushLiteralRun(done); + } + repeat_count_ = 0; +} + +template<typename T> +inline void RleEncoder<T>::Reserve(int num_bytes, uint8_t val) { + for (int i = 0; i < num_bytes; ++i) { + bit_writer_.PutValue(val, 8); + } +} + +template<typename T> +inline int RleEncoder<T>::Flush() { + if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { + bool all_repeat = literal_count_ == 0 && + (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); + // There is something pending, figure out if it's a repeated or literal run + if (repeat_count_ > 0 && all_repeat) { + FlushRepeatedRun(); + } else { + literal_count_ += num_buffered_values_; + FlushLiteralRun(true); + repeat_count_ = 0; + } + } + bit_writer_.Flush(); + DCHECK_EQ(num_buffered_values_, 0); + DCHECK_EQ(literal_count_, 0); + DCHECK_EQ(repeat_count_, 0); + return bit_writer_.bytes_written(); +} + +template<typename T> +inline void RleEncoder<T>::Clear() { + current_value_ = 0; + repeat_count_ = 0; + num_buffered_values_ = 0; + literal_count_ = 0; + literal_indicator_byte_idx_ = -1; + bit_writer_.Clear(); +} + +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rle-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/rle-test.cc b/be/src/kudu/util/rle-test.cc new file mode 100644 index 0000000..185fed5 --- /dev/null +++ b/be/src/kudu/util/rle-test.cc @@ -0,0 +1,537 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include <stdlib.h> +#include <stdio.h> + +// Must come before gtest.h. +#include "kudu/gutil/mathlimits.h" + +#include <boost/utility/binary.hpp> +#include <gtest/gtest.h> +#include <string> +#include <vector> + +#include "kudu/util/rle-encoding.h" +#include "kudu/util/bit-stream-utils.h" +#include "kudu/util/hexdump.h" +#include "kudu/util/test_util.h" + +using std::string; +using std::vector; + +namespace kudu { + +const int MAX_WIDTH = 64; + +class TestRle : public KuduTest {}; + +TEST(BitArray, TestBool) { + const int len_bytes = 2; + faststring buffer(len_bytes); + + BitWriter writer(&buffer); + + // Write alternating 0's and 1's + for (int i = 0; i < 8; ++i) { + writer.PutValue(i % 2, 1); + } + writer.Flush(); + EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0)); + + // Write 00110011 + for (int i = 0; i < 8; ++i) { + switch (i) { + case 0: + case 1: + case 4: + case 5: + writer.PutValue(0, 1); + break; + default: + writer.PutValue(1, 1); + break; + } + } + writer.Flush(); + + // Validate the exact bit value + EXPECT_EQ(buffer[0], BOOST_BINARY(1 0 1 0 1 0 1 0)); + EXPECT_EQ(buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0)); + + // Use the reader and validate + BitReader reader(buffer.data(), buffer.size()); + for (int i = 0; i < 8; ++i) { + bool val = false; + bool result = reader.GetValue(1, &val); + EXPECT_TRUE(result); + EXPECT_EQ(val, i % 2); + } + + for (int i = 0; i < 8; ++i) { + bool val = false; + bool result = reader.GetValue(1, &val); + EXPECT_TRUE(result); + switch (i) { + case 0: + case 1: + case 4: + case 5: + EXPECT_EQ(val, false); + break; + default: + EXPECT_EQ(val, true); + break; + } + } +} + +// Writes 'num_vals' values with width 'bit_width' and reads them back. +void TestBitArrayValues(int bit_width, int num_vals) { + const int kTestLen = BitUtil::Ceil(bit_width * num_vals, 8); + const uint64_t mod = bit_width == 64? 1 : 1LL << bit_width; + + faststring buffer(kTestLen); + BitWriter writer(&buffer); + for (int i = 0; i < num_vals; ++i) { + writer.PutValue(i % mod, bit_width); + } + writer.Flush(); + EXPECT_EQ(writer.bytes_written(), kTestLen); + + BitReader reader(buffer.data(), kTestLen); + for (int i = 0; i < num_vals; ++i) { + int64_t val = 0; + bool result = reader.GetValue(bit_width, &val); + EXPECT_TRUE(result); + EXPECT_EQ(val, i % mod); + } + EXPECT_EQ(reader.bytes_left(), 0); +} + +TEST(BitArray, TestValues) { + for (int width = 1; width <= MAX_WIDTH; ++width) { + TestBitArrayValues(width, 1); + TestBitArrayValues(width, 2); + // Don't write too many values + TestBitArrayValues(width, (width < 12) ? (1 << width) : 4096); + TestBitArrayValues(width, 1024); + } +} + +// Test some mixed values +TEST(BitArray, TestMixed) { + const int kTestLenBits = 1024; + faststring buffer(kTestLenBits / 8); + bool parity = true; + + BitWriter writer(&buffer); + for (int i = 0; i < kTestLenBits; ++i) { + if (i % 2 == 0) { + writer.PutValue(parity, 1); + parity = !parity; + } else { + writer.PutValue(i, 10); + } + } + writer.Flush(); + + parity = true; + BitReader reader(buffer.data(), buffer.size()); + for (int i = 0; i < kTestLenBits; ++i) { + bool result; + if (i % 2 == 0) { + bool val = false; + result = reader.GetValue(1, &val); + EXPECT_EQ(val, parity); + parity = !parity; + } else { + int val; + result = reader.GetValue(10, &val); + EXPECT_EQ(val, i); + } + EXPECT_TRUE(result); + } +} + +// Validates encoding of values by encoding and decoding them. If +// expected_encoding != NULL, also validates that the encoded buffer is +// exactly 'expected_encoding'. +// if expected_len is not -1, it will validate the encoded size is correct. +template<typename T> +void ValidateRle(const vector<T>& values, int bit_width, + uint8_t* expected_encoding, int expected_len) { + faststring buffer; + RleEncoder<T> encoder(&buffer, bit_width); + + for (const auto& value : values) { + encoder.Put(value); + } + int encoded_len = encoder.Flush(); + + if (expected_len != -1) { + EXPECT_EQ(encoded_len, expected_len); + } + if (expected_encoding != nullptr) { + EXPECT_EQ(memcmp(buffer.data(), expected_encoding, expected_len), 0) + << "\n" + << "Expected: " << HexDump(Slice(expected_encoding, expected_len)) << "\n" + << "Got: " << HexDump(Slice(buffer)); + } + + // Verify read + RleDecoder<T> decoder(buffer.data(), encoded_len, bit_width); + for (const auto& value : values) { + T val = 0; + bool result = decoder.Get(&val); + EXPECT_TRUE(result); + EXPECT_EQ(value, val); + } +} + +TEST(Rle, SpecificSequences) { + const int kTestLen = 1024; + uint8_t expected_buffer[kTestLen]; + vector<uint64_t> values; + + // Test 50 0' followed by 50 1's + values.resize(100); + for (int i = 0; i < 50; ++i) { + values[i] = 0; + } + for (int i = 50; i < 100; ++i) { + values[i] = 1; + } + + // expected_buffer valid for bit width <= 1 byte + expected_buffer[0] = (50 << 1); + expected_buffer[1] = 0; + expected_buffer[2] = (50 << 1); + expected_buffer[3] = 1; + for (int width = 1; width <= 8; ++width) { + ValidateRle(values, width, expected_buffer, 4); + } + + for (int width = 9; width <= MAX_WIDTH; ++width) { + ValidateRle(values, width, nullptr, 2 * (1 + BitUtil::Ceil(width, 8))); + } + + // Test 100 0's and 1's alternating + for (int i = 0; i < 100; ++i) { + values[i] = i % 2; + } + int num_groups = BitUtil::Ceil(100, 8); + expected_buffer[0] = (num_groups << 1) | 1; + for (int i = 0; i < 100/8; ++i) { + expected_buffer[i + 1] = BOOST_BINARY(1 0 1 0 1 0 1 0); // 0xaa + } + // Values for the last 4 0 and 1's + expected_buffer[1 + 100/8] = BOOST_BINARY(0 0 0 0 1 0 1 0); // 0x0a + + // num_groups and expected_buffer only valid for bit width = 1 + ValidateRle(values, 1, expected_buffer, 1 + num_groups); + for (int width = 2; width <= MAX_WIDTH; ++width) { + ValidateRle(values, width, nullptr, 1 + BitUtil::Ceil(width * 100, 8)); + } +} + +// ValidateRle on 'num_vals' values with width 'bit_width'. If 'value' != -1, that value +// is used, otherwise alternating values are used. +void TestRleValues(int bit_width, int num_vals, int value = -1) { + const uint64_t mod = bit_width == 64 ? 1ULL : 1ULL << bit_width; + vector<uint64_t> values; + for (uint64_t v = 0; v < num_vals; ++v) { + values.push_back((value != -1) ? value : (bit_width == 64 ? v : (v % mod))); + } + ValidateRle(values, bit_width, nullptr, -1); +} + +TEST(Rle, TestValues) { + for (int width = 1; width <= MAX_WIDTH; ++width) { + TestRleValues(width, 1); + TestRleValues(width, 1024); + TestRleValues(width, 1024, 0); + TestRleValues(width, 1024, 1); + } +} + +class BitRle : public KuduTest { +}; + +// Tests all true/false values +TEST_F(BitRle, AllSame) { + const int kTestLen = 1024; + vector<bool> values; + + for (int v = 0; v < 2; ++v) { + values.clear(); + for (int i = 0; i < kTestLen; ++i) { + values.push_back(v ? true : false); + } + + ValidateRle(values, 1, nullptr, 3); + } +} + +// Test that writes out a repeated group and then a literal +// group but flush before finishing. +TEST_F(BitRle, Flush) { + vector<bool> values; + for (int i = 0; i < 16; ++i) values.push_back(1); + values.push_back(false); + ValidateRle(values, 1, nullptr, -1); + values.push_back(true); + ValidateRle(values, 1, nullptr, -1); + values.push_back(true); + ValidateRle(values, 1, nullptr, -1); + values.push_back(true); + ValidateRle(values, 1, nullptr, -1); +} + +// Test some random bool sequences. +TEST_F(BitRle, RandomBools) { + int iters = 0; + const int n_iters = AllowSlowTests() ? 1000 : 20; + while (iters < n_iters) { + srand(iters++); + if (iters % 10000 == 0) LOG(ERROR) << "Seed: " << iters; + vector<uint64_t > values; + bool parity = 0; + for (int i = 0; i < 1000; ++i) { + int group_size = rand() % 20 + 1; // NOLINT(*) + if (group_size > 16) { + group_size = 1; + } + for (int i = 0; i < group_size; ++i) { + values.push_back(parity); + } + parity = !parity; + } + ValidateRle(values, (iters % MAX_WIDTH) + 1, nullptr, -1); + } +} + +// Test some random 64-bit sequences. +TEST_F(BitRle, Random64Bit) { + int iters = 0; + const int n_iters = AllowSlowTests() ? 1000 : 20; + while (iters < n_iters) { + srand(iters++); + if (iters % 10000 == 0) LOG(ERROR) << "Seed: " << iters; + vector<uint64_t > values; + for (int i = 0; i < 1000; ++i) { + int group_size = rand() % 20 + 1; // NOLINT(*) + uint64_t cur_value = (static_cast<uint64_t>(rand()) << 32) + static_cast<uint64_t>(rand()); + if (group_size > 16) { + group_size = 1; + } + for (int i = 0; i < group_size; ++i) { + values.push_back(cur_value); + } + + } + ValidateRle(values, 64, nullptr, -1); + } +} + +// Test a sequence of 1 0's, 2 1's, 3 0's. etc +// e.g. 011000111100000 +TEST_F(BitRle, RepeatedPattern) { + vector<bool> values; + const int min_run = 1; + const int max_run = 32; + + for (int i = min_run; i <= max_run; ++i) { + int v = i % 2; + for (int j = 0; j < i; ++j) { + values.push_back(v); + } + } + + // And go back down again + for (int i = max_run; i >= min_run; --i) { + int v = i % 2; + for (int j = 0; j < i; ++j) { + values.push_back(v); + } + } + + ValidateRle(values, 1, nullptr, -1); +} + +TEST_F(TestRle, TestBulkPut) { + size_t run_length; + bool val = false; + + faststring buffer(1); + RleEncoder<bool> encoder(&buffer, 1); + encoder.Put(true, 10); + encoder.Put(false, 7); + encoder.Put(true, 5); + encoder.Put(true, 15); + encoder.Flush(); + + RleDecoder<bool> decoder(buffer.data(), encoder.len(), 1); + run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax); + ASSERT_TRUE(val); + ASSERT_EQ(10, run_length); + + run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax); + ASSERT_FALSE(val); + ASSERT_EQ(7, run_length); + + run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax); + ASSERT_TRUE(val); + ASSERT_EQ(20, run_length); + + ASSERT_EQ(0, decoder.GetNextRun(&val, MathLimits<size_t>::kMax)); +} + +TEST_F(TestRle, TestGetNextRun) { + // Repeat the test with different number of items + for (int num_items = 7; num_items < 200; num_items += 13) { + // Test different block patterns + // 1: 01010101 01010101 + // 2: 00110011 00110011 + // 3: 00011100 01110001 + // ... + for (int block = 1; block <= 20; ++block) { + faststring buffer(1); + RleEncoder<bool> encoder(&buffer, 1); + for (int j = 0; j < num_items; ++j) { + encoder.Put(!!(j & 1), block); + } + encoder.Flush(); + + RleDecoder<bool> decoder(buffer.data(), encoder.len(), 1); + size_t count = num_items * block; + for (int j = 0; j < num_items; ++j) { + size_t run_length; + bool val = false; + DCHECK_GT(count, 0); + run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax); + run_length = std::min(run_length, count); + + ASSERT_EQ(!!(j & 1), val); + ASSERT_EQ(block, run_length); + count -= run_length; + } + DCHECK_EQ(count, 0); + } + } +} + +// Generate a random bit string which consists of 'num_runs' runs, +// each with a random length between 1 and 100. Returns the number +// of values encoded (i.e the sum run length). +static size_t GenerateRandomBitString(int num_runs, faststring* enc_buf, string* string_rep) { + RleEncoder<bool> enc(enc_buf, 1); + int num_bits = 0; + for (int i = 0; i < num_runs; i++) { + int run_length = random() % 100; + bool value = static_cast<bool>(i & 1); + enc.Put(value, run_length); + string_rep->append(run_length, value ? '1' : '0'); + num_bits += run_length; + } + enc.Flush(); + return num_bits; +} + +TEST_F(TestRle, TestRoundTripRandomSequencesWithRuns) { + SeedRandom(); + + // Test the limiting function of GetNextRun. + const int kMaxToReadAtOnce = (random() % 20) + 1; + + // Generate a bunch of random bit sequences, and "round-trip" them + // through the encode/decode sequence. + for (int rep = 0; rep < 100; rep++) { + faststring buf; + string string_rep; + int num_bits = GenerateRandomBitString(10, &buf, &string_rep); + RleDecoder<bool> decoder(buf.data(), buf.size(), 1); + string roundtrip_str; + int rem_to_read = num_bits; + size_t run_len; + bool val; + while (rem_to_read > 0 && + (run_len = decoder.GetNextRun(&val, std::min(kMaxToReadAtOnce, rem_to_read))) != 0) { + ASSERT_LE(run_len, kMaxToReadAtOnce); + roundtrip_str.append(run_len, val ? '1' : '0'); + rem_to_read -= run_len; + } + + ASSERT_EQ(string_rep, roundtrip_str); + } +} +TEST_F(TestRle, TestSkip) { + faststring buffer(1); + RleEncoder<bool> encoder(&buffer, 1); + + // 0101010[1] 01010101 01 + // "A" + for (int j = 0; j < 18; ++j) { + encoder.Put(!!(j & 1)); + } + + // 0011[00] 11001100 11001100 11001100 11001100 + // "B" + for (int j = 0; j < 19; ++j) { + encoder.Put(!!(j & 1), 2); + } + + // 000000000000 11[1111111111] 000000000000 111111111111 + // "C" + // 000000000000 111111111111 0[00000000000] 111111111111 + // "D" + // 000000000000 111111111111 000000000000 111111111111 + for (int j = 0; j < 12; ++j) { + encoder.Put(!!(j & 1), 12); + } + encoder.Flush(); + + bool val = false; + size_t run_length; + RleDecoder<bool> decoder(buffer.data(), encoder.len(), 1); + + // position before "A" + ASSERT_EQ(3, decoder.Skip(7)); + run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax); + ASSERT_TRUE(val); + ASSERT_EQ(1, run_length); + + // position before "B" + ASSERT_EQ(7, decoder.Skip(14)); + run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax); + ASSERT_FALSE(val); + ASSERT_EQ(2, run_length); + + // position before "C" + ASSERT_EQ(18, decoder.Skip(46)); + run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax); + ASSERT_TRUE(val); + ASSERT_EQ(10, run_length); + + // position before "D" + ASSERT_EQ(24, decoder.Skip(49)); + run_length = decoder.GetNextRun(&val, MathLimits<size_t>::kMax); + ASSERT_FALSE(val); + ASSERT_EQ(11, run_length); + + encoder.Flush(); +} +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/rolling_log-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/rolling_log-test.cc b/be/src/kudu/util/rolling_log-test.cc new file mode 100644 index 0000000..3c6f60b --- /dev/null +++ b/be/src/kudu/util/rolling_log-test.cc @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/rolling_log.h" + +#include <glog/logging.h> +#include <glog/stl_logging.h> +#include <string> +#include <vector> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/strings/util.h" +#include "kudu/util/env.h" +#include "kudu/util/path_util.h" +#include "kudu/util/test_util.h" + +using std::string; +using std::vector; +using strings::Substitute; + +namespace kudu { + +class RollingLogTest : public KuduTest { + public: + RollingLogTest() + : log_dir_(GetTestPath("log_dir")) { + } + + virtual void SetUp() OVERRIDE { + ASSERT_OK(env_->CreateDir(log_dir_)); + } + + protected: + void AssertLogCount(int expected_count, vector<string>* children) { + vector<string> dir_entries; + ASSERT_OK(env_->GetChildren(log_dir_, &dir_entries)); + children->clear(); + + for (const string& child : dir_entries) { + if (child == "." || child == "..") continue; + children->push_back(child); + ASSERT_TRUE(HasPrefixString(child, "rolling_log-test.")); + ASSERT_STR_CONTAINS(child, ".mylog."); + + string pid_suffix = Substitute("$0", getpid()); + ASSERT_TRUE(HasSuffixString(child, pid_suffix) || + HasSuffixString(child, pid_suffix + ".gz")) << "bad child: " << child; + } + ASSERT_EQ(children->size(), expected_count) << *children; + } + + const string log_dir_; +}; + +// Test with compression off. +TEST_F(RollingLogTest, TestLog) { + RollingLog log(env_, log_dir_, "mylog"); + log.SetCompressionEnabled(false); + log.SetSizeLimitBytes(100); + + // Before writing anything, we shouldn't open a log file. + vector<string> children; + NO_FATALS(AssertLogCount(0, &children)); + + // Appending some data should write a new segment. + ASSERT_OK(log.Append("Hello world\n")); + NO_FATALS(AssertLogCount(1, &children)); + + for (int i = 0; i < 10; i++) { + ASSERT_OK(log.Append("Hello world\n")); + } + NO_FATALS(AssertLogCount(2, &children)); + + faststring data; + string path = JoinPathSegments(log_dir_, children[0]); + ASSERT_OK(ReadFileToString(env_, path, &data)); + ASSERT_TRUE(HasPrefixString(data.ToString(), "Hello world\n")) + << "Data missing"; + ASSERT_LE(data.size(), 100) << "Size limit not respected"; +} + +// Test with compression on. +TEST_F(RollingLogTest, TestCompression) { + RollingLog log(env_, log_dir_, "mylog"); + ASSERT_OK(log.Open()); + + StringPiece data = "Hello world\n"; + int raw_size = 0; + for (int i = 0; i < 1000; i++) { + ASSERT_OK(log.Append(data)); + raw_size += data.size(); + } + ASSERT_OK(log.Close()); + + vector<string> children; + NO_FATALS(AssertLogCount(1, &children)); + ASSERT_TRUE(HasSuffixString(children[0], ".gz")); + + // Ensure that the output is actually gzipped. + uint64_t size; + ASSERT_OK(env_->GetFileSize(JoinPathSegments(log_dir_, children[0]), &size)); + ASSERT_LT(size, raw_size / 10); + ASSERT_GT(size, 0); +} + +} // namespace kudu
