Start a background thread to run ResultTracker GC Change-Id: Ia34ce95e78920596eb8b9db53643845f637c8e6c Reviewed-on: http://gerrit.cloudera.org:8080/3961 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/dccca07c Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/dccca07c Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/dccca07c Branch: refs/heads/master Commit: dccca07cf14c3d32a80084d0f38d4fe8f0ae1d27 Parents: fc09ddc Author: Todd Lipcon <[email protected]> Authored: Fri Aug 12 17:07:40 2016 -0700 Committer: Todd Lipcon <[email protected]> Committed: Tue Aug 16 05:58:58 2016 +0000 ---------------------------------------------------------------------- src/kudu/rpc/exactly_once_rpc-test.cc | 32 ++++++++++-------------------- src/kudu/rpc/result_tracker.cc | 25 ++++++++++++++++++++++- src/kudu/rpc/result_tracker.h | 18 +++++++++++++++-- src/kudu/rpc/service_if.cc | 2 +- src/kudu/server/server_base.cc | 2 ++ 5 files changed, 54 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/dccca07c/src/kudu/rpc/exactly_once_rpc-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/exactly_once_rpc-test.cc b/src/kudu/rpc/exactly_once_rpc-test.cc index 3f3f59e..0eacd42 100644 --- a/src/kudu/rpc/exactly_once_rpc-test.cc +++ b/src/kudu/rpc/exactly_once_rpc-test.cc @@ -21,6 +21,7 @@ DECLARE_int64(remember_clients_ttl_ms); DECLARE_int64(remember_responses_ttl_ms); +DECLARE_int64(result_tracker_gc_interval_ms); using std::atomic_int; using std::shared_ptr; @@ -247,16 +248,6 @@ class ExactlyOnceRpcTest : public RpcTestBase { request_tracker_->RpcCompleted(seq_no); } - // Continuously runs GC on the ResultTracker. - void RunGcThread(MonoDelta run_for) { - MonoTime run_until = MonoTime::Now(); - run_until.AddDelta(run_for); - while (MonoTime::Now().ComesBefore(run_until)) { - result_tracker_->GCResults(); - SleepFor(MonoDelta::FromMilliseconds(rand() % 10)); - } - } - // This continuously issues calls to the server, that often last longer than // 'remember_responses_ttl_ms', making sure that we don't get errors back. @@ -546,27 +537,22 @@ TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollection) { TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollectionStressTest) { FLAGS_remember_clients_ttl_ms = 100; FLAGS_remember_responses_ttl_ms = 10; + FLAGS_result_tracker_gc_interval_ms = 10; StartServer(); - // The write thread runs for the shortest period to make sure client GC has a + // The write thread runs for a shorter period to make sure client GC has a // chance to run. MonoDelta writes_run_for = MonoDelta::FromSeconds(2); MonoDelta stubborn_run_for = MonoDelta::FromSeconds(3); - // GC runs for the longest period because the stubborn thread may wait beyond its deadline - // to wait on client GC. - MonoDelta gc_run_for = MonoDelta::FromSeconds(4); if (AllowSlowTests()) { writes_run_for = MonoDelta::FromSeconds(10); stubborn_run_for = MonoDelta::FromSeconds(11); - gc_run_for = MonoDelta::FromSeconds(12); } - scoped_refptr<kudu::Thread> gc_thread; scoped_refptr<kudu::Thread> write_thread; scoped_refptr<kudu::Thread> stubborn_thread; - CHECK_OK(kudu::Thread::Create( - "gc", "gc", &ExactlyOnceRpcTest::RunGcThread, this, gc_run_for, &gc_thread)); + result_tracker_->StartGCThread(); CHECK_OK(kudu::Thread::Create( "write", "write", &ExactlyOnceRpcTest::DoLongWritesThread, this, writes_run_for, &write_thread)); @@ -574,13 +560,17 @@ TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollectionStressTest) "stubborn", "stubborn", &ExactlyOnceRpcTest::StubbornlyWriteTheSameRequestThread, this, stubborn_run_for, &stubborn_thread)); - gc_thread->Join(); write_thread->Join(); stubborn_thread->Join(); - result_tracker_->GCResults(); - ASSERT_EQ(0, mem_tracker_->consumption()); + // Within a few seconds, the consumption should be back to zero. + // Really, this should be within 100ms, but we'll give it a bit of + // time to avoid test flakiness. + AssertEventually([&]() { + ASSERT_EQ(0, mem_tracker_->consumption()); + }, MonoDelta::FromSeconds(5)); } + } // namespace rpc } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/dccca07c/src/kudu/rpc/result_tracker.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/result_tracker.cc b/src/kudu/rpc/result_tracker.cc index a75b3e1..259d12a 100644 --- a/src/kudu/rpc/result_tracker.cc +++ b/src/kudu/rpc/result_tracker.cc @@ -45,6 +45,10 @@ DEFINE_int64(remember_responses_ttl_ms, 600 * 1000 /* 10 mins */, "STALE."); TAG_FLAG(remember_responses_ttl_ms, advanced); +DEFINE_int64(result_tracker_gc_interval_ms, 1000, + "Interval at which the result tracker will look for entries to GC."); +TAG_FLAG(result_tracker_gc_interval_ms, hidden); + namespace kudu { namespace rpc { @@ -89,9 +93,15 @@ struct ScopedMemTrackerUpdater { ResultTracker::ResultTracker(shared_ptr<MemTracker> mem_tracker) : mem_tracker_(std::move(mem_tracker)), clients_(ClientStateMap::key_compare(), - ClientStateMapAllocator(mem_tracker_)) {} + ClientStateMapAllocator(mem_tracker_)), + gc_thread_stop_latch_(1) {} ResultTracker::~ResultTracker() { + if (gc_thread_) { + gc_thread_stop_latch_.CountDown(); + gc_thread_->Join(); + } + lock_guard<simple_spinlock> l(lock_); // Release all the memory for the stuff we'll delete on destruction. for (auto& client_state : clients_) { @@ -411,6 +421,19 @@ void ResultTracker::FailAndRespond(const RequestIdPB& request_id, FailAndRespondInternal(request_id, func); } +void ResultTracker::StartGCThread() { + CHECK(!gc_thread_); + CHECK_OK(Thread::Create("server", "result-tracker", &ResultTracker::RunGCThread, + this, &gc_thread_)); +} + +void ResultTracker::RunGCThread() { + while (!gc_thread_stop_latch_.WaitFor(MonoDelta::FromMilliseconds( + FLAGS_result_tracker_gc_interval_ms))) { + GCResults(); + } +} + void ResultTracker::GCResults() { lock_guard<simple_spinlock> l(lock_); MonoTime now = MonoTime::Now(); http://git-wip-us.apache.org/repos/asf/kudu/blob/dccca07c/src/kudu/rpc/result_tracker.h ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/result_tracker.h b/src/kudu/rpc/result_tracker.h index 5c1d518..f629d7a 100644 --- a/src/kudu/rpc/result_tracker.h +++ b/src/kudu/rpc/result_tracker.h @@ -26,10 +26,12 @@ #include "kudu/gutil/stl_util.h" #include "kudu/rpc/request_tracker.h" #include "kudu/rpc/rpc_header.pb.h" +#include "kudu/util/countdown_latch.h" #include "kudu/util/locks.h" #include "kudu/util/malloc.h" #include "kudu/util/mem_tracker.h" #include "kudu/util/monotime.h" +#include "kudu/util/thread.h" namespace google { namespace protobuf { @@ -141,8 +143,6 @@ class RpcContext; // } // // This class is thread safe. -// -// TODO Garbage collection. class ResultTracker : public RefCountedThreadSafe<ResultTracker> { public: typedef rpc::RequestTracker::SequenceNumber SequenceNumber; @@ -222,6 +222,12 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> { int error_ext_id, const std::string& message, const google::protobuf::Message& app_error_pb); + // Start a background thread which periodically runs GCResults(). + // This thread is automatically stopped in the destructor. + // + // Must be called at most once. + void StartGCThread(); + // Runs time-based garbage collection on the results this result tracker is caching. // When garbage collection runs, it goes through all ClientStates and: // - If a ClientState is older than the 'remember_clients_ttl_ms' flag and no @@ -230,6 +236,8 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> { // through all CompletionRecords and: // - If the CompletionRecord is older than the 'remember_responses_ttl_secs' flag, // GCs the CompletionRecord and advances the 'stale_before_seq_no' watermark. + // + // Typically this is invoked from an internal thread started by 'StartGCThread()'. void GCResults(); string ToString(); @@ -361,6 +369,8 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> { std::string ToStringUnlocked() const; + void RunGCThread(); + // The memory tracker that tracks this ResultTracker's memory consumption. std::shared_ptr<kudu::MemTracker> mem_tracker_; @@ -378,6 +388,10 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> { ClientStateMap clients_; + // The thread which runs GC, and a latch to stop it. + scoped_refptr<Thread> gc_thread_; + CountDownLatch gc_thread_stop_latch_; + DISALLOW_COPY_AND_ASSIGN(ResultTracker); }; http://git-wip-us.apache.org/repos/asf/kudu/blob/dccca07c/src/kudu/rpc/service_if.cc ---------------------------------------------------------------------- diff --git a/src/kudu/rpc/service_if.cc b/src/kudu/rpc/service_if.cc index f3863f2..d64647b 100644 --- a/src/kudu/rpc/service_if.cc +++ b/src/kudu/rpc/service_if.cc @@ -29,7 +29,7 @@ #include "kudu/rpc/rpc_header.pb.h" #include "kudu/util/flag_tags.h" -// TODO remove this once we have ResultTracker GC +// TODO remove this once we have fully cluster-tested this. DEFINE_bool(enable_exactly_once, false, "Whether to enable exactly once semantics on the client " "(experimental)."); TAG_FLAG(enable_exactly_once, experimental); http://git-wip-us.apache.org/repos/asf/kudu/blob/dccca07c/src/kudu/server/server_base.cc ---------------------------------------------------------------------- diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc index 6010f23..6d84393 100644 --- a/src/kudu/server/server_base.cc +++ b/src/kudu/server/server_base.cc @@ -185,6 +185,8 @@ Status ServerBase::Init() { RETURN_NOT_OK_PREPEND(StartMetricsLogging(), "Could not enable metrics logging"); + result_tracker_->StartGCThread(); + return Status::OK(); }
