This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 8808b04  [clock] update on Clock interface
8808b04 is described below

commit 8808b041c9db0af7642311390d7d9189032cc36a
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Feb 12 19:18:03 2020 -0800

    [clock] update on Clock interface
    
    This patch re-factors Clock-related classes:
      * removed Clock::RegisterMetrics() method
      * HybridClock constructor requires metric entity
      * LogicalClock constructor accepts metric entity as optional
        second parameter
      * LogicalClock constructor is now public
      * LogicalClock::CreateStartingAt() is gone
    
    I also did other minor re-factoring, partially due to warnings reported
    by ClangTidy on the code I touched.
    
    The motivation for this change is to prepare for follow-up changelists
    addressing KUDU-3048 (adding clock metrics for better observability).
    
    Change-Id: Ic4c1944d54bf50e54c06c12e2fb9e57fc452b877
    Reviewed-on: http://gerrit.cloudera.org:8080/15215
    Tested-by: Kudu Jenkins
    Reviewed-by: Volodymyr Verovkin <[email protected]>
    Reviewed-by: Adar Dembo <[email protected]>
---
 src/kudu/clock/clock.h                           |   3 -
 src/kudu/clock/hybrid_clock-test.cc              | 147 ++++++++++++-----------
 src/kudu/clock/hybrid_clock.cc                   |  24 ++--
 src/kudu/clock/hybrid_clock.h                    |  12 +-
 src/kudu/clock/logical_clock-test.cc             |  44 +++----
 src/kudu/clock/logical_clock.cc                  |  23 ++--
 src/kudu/clock/logical_clock.h                   |  12 +-
 src/kudu/consensus/consensus_peers-test.cc       |  29 +++--
 src/kudu/consensus/consensus_queue-test.cc       |  14 ++-
 src/kudu/consensus/log-test-base.h               |   4 +-
 src/kudu/consensus/log_cache-test.cc             |  15 ++-
 src/kudu/consensus/raft_consensus_quorum-test.cc |  15 ++-
 src/kudu/consensus/time_manager-test.cc          |  36 ++++--
 src/kudu/integration-tests/ts_recovery-itest.cc  |  11 +-
 src/kudu/server/server_base.cc                   |   6 +-
 src/kudu/tablet/compaction-test.cc               |  29 ++---
 src/kudu/tablet/deltamemstore-test.cc            |  29 +++--
 src/kudu/tablet/diskrowset-test-base.h           |  14 +--
 src/kudu/tablet/diskrowset-test.cc               |  11 +-
 src/kudu/tablet/memrowset-test.cc                |  19 ++-
 src/kudu/tablet/mvcc-test.cc                     | 110 ++++++++---------
 src/kudu/tablet/mvcc.h                           |   2 +-
 src/kudu/tablet/tablet-harness.h                 |  33 ++---
 src/kudu/tablet/tablet-test-util.h               |   1 -
 src/kudu/tools/tool_action_perf.cc               |   8 +-
 25 files changed, 336 insertions(+), 315 deletions(-)

diff --git a/src/kudu/clock/clock.h b/src/kudu/clock/clock.h
index 79ac84e..020679c 100644
--- a/src/kudu/clock/clock.h
+++ b/src/kudu/clock/clock.h
@@ -103,9 +103,6 @@ class Clock {
   // to Now() would return a higher value than t).
   virtual bool IsAfter(Timestamp t) = 0;
 
-  // Register the clock metrics in the given entity.
-  virtual void RegisterMetrics(const scoped_refptr<MetricEntity>& 
metric_entity) = 0;
-
   // Strigifies the provided timestamp according to this clock's internal 
format.
   virtual std::string Stringify(Timestamp timestamp) = 0;
 };
diff --git a/src/kudu/clock/hybrid_clock-test.cc 
b/src/kudu/clock/hybrid_clock-test.cc
index 422daf4..378ec78 100644
--- a/src/kudu/clock/hybrid_clock-test.cc
+++ b/src/kudu/clock/hybrid_clock-test.cc
@@ -19,7 +19,6 @@
 
 #include <algorithm>
 #include <cstdint>
-#include <memory>
 #include <string>
 #include <vector>
 
@@ -34,6 +33,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/util/atomic.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/random.h"
 #include "kudu/util/random_util.h"
@@ -43,60 +43,59 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/thread.h"
 
-
 DECLARE_bool(inject_unsync_time_errors);
 DECLARE_string(time_source);
+METRIC_DECLARE_entity(server);
 
 using std::string;
-using std::unique_ptr;
 using std::vector;
 
 namespace kudu {
 namespace clock {
 
-class HybridClockTest : public KuduTest {
+class ClockTest : public KuduTest {
  public:
-  HybridClockTest()
-      : clock_(new HybridClock) {
-  }
-
-  void SetUp() override {
-    KuduTest::SetUp();
-    ASSERT_OK(clock_->Init());
+  ClockTest()
+      : metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
+                                                        "clock-test")) {
   }
 
  protected:
-  unique_ptr<HybridClock> clock_;
+  MetricRegistry metric_registry_;
+  scoped_refptr<MetricEntity> metric_entity_;
+};
+
+class MockHybridClockTest : public ClockTest {
 };
 
-clock::MockNtp* mock_ntp(HybridClock* clock) {
-  return down_cast<clock::MockNtp*>(clock->time_service());
+clock::MockNtp* mock_ntp(const HybridClock& clock) {
+  return down_cast<clock::MockNtp*>(clock.time_service());
 }
 
-TEST(MockHybridClockTest, TestMockedSystemClock) {
+TEST_F(MockHybridClockTest, TestMockedSystemClock) {
   google::FlagSaver saver;
   FLAGS_time_source = "mock";
-  unique_ptr<HybridClock> clock(new HybridClock);
-  ASSERT_OK(clock->Init());
+  HybridClock clock(metric_entity_);
+  ASSERT_OK(clock.Init());
   Timestamp timestamp;
   uint64_t max_error_usec;
-  clock->NowWithError(&timestamp, &max_error_usec);
+  clock.NowWithError(&timestamp, &max_error_usec);
   ASSERT_EQ(timestamp.ToUint64(), 0);
   ASSERT_EQ(max_error_usec, 0);
   // If we read the clock again we should see the logical component be 
incremented.
-  clock->NowWithError(&timestamp, &max_error_usec);
+  clock.NowWithError(&timestamp, &max_error_usec);
   ASSERT_EQ(timestamp.ToUint64(), 1);
   // Now set an arbitrary time and check that is the time returned by the 
clock.
   uint64_t time = 1234 * 1000;
   uint64_t error = 100 * 1000;
-  mock_ntp(clock.get())->SetMockClockWallTimeForTests(time);
-  mock_ntp(clock.get())->SetMockMaxClockErrorForTests(error);
-  clock->NowWithError(&timestamp, &max_error_usec);
+  mock_ntp(clock)->SetMockClockWallTimeForTests(time);
+  mock_ntp(clock)->SetMockMaxClockErrorForTests(error);
+  clock.NowWithError(&timestamp, &max_error_usec);
   ASSERT_EQ(timestamp.ToUint64(),
             HybridClock::TimestampFromMicrosecondsAndLogicalValue(time, 
0).ToUint64());
   ASSERT_EQ(max_error_usec, error);
   // Perform another read, we should observe the logical component increment, 
again.
-  clock->NowWithError(&timestamp, &max_error_usec);
+  clock.NowWithError(&timestamp, &max_error_usec);
   ASSERT_EQ(timestamp.ToUint64(),
             HybridClock::TimestampFromMicrosecondsAndLogicalValue(time, 
1).ToUint64());
 }
@@ -107,22 +106,22 @@ TEST(MockHybridClockTest, TestMockedSystemClock) {
 // guarantees even as the physical clock continues to increase.
 //
 // This is a regression test for KUDU-1345.
-TEST(MockHybridClockTest, TestClockDealsWithWrapping) {
+TEST_F(MockHybridClockTest, TestClockDealsWithWrapping) {
   google::FlagSaver saver;
   FLAGS_time_source = "mock";
-  unique_ptr<HybridClock> clock(new HybridClock);
-  ASSERT_OK(clock->Init());
-  mock_ntp(clock.get())->SetMockClockWallTimeForTests(1000);
+  HybridClock clock(metric_entity_);
+  ASSERT_OK(clock.Init());
+  mock_ntp(clock)->SetMockClockWallTimeForTests(1000);
 
-  Timestamp prev = clock->Now();
+  Timestamp prev = clock.Now();
 
   // Update the clock from 10us in the future
-  ASSERT_OK(clock->Update(HybridClock::TimestampFromMicroseconds(1010)));
+  ASSERT_OK(clock.Update(HybridClock::TimestampFromMicroseconds(1010)));
 
   // Now read the clock value enough times so that the logical value wraps
   // over, and should increment the _physical_ portion of the clock.
   for (int i = 0; i < 10000; i++) {
-    Timestamp now = clock->Now();
+    Timestamp now = clock.Now();
     ASSERT_GT(now.value(), prev.value());
     prev = now;
   }
@@ -131,8 +130,8 @@ TEST(MockHybridClockTest, TestClockDealsWithWrapping) {
   // Advance the time microsecond by microsecond, and ensure the clock never
   // goes backwards.
   for (int time = 1001; time < 1020; time++) {
-    mock_ntp(clock.get())->SetMockClockWallTimeForTests(time);
-    Timestamp now = clock->Now();
+    mock_ntp(clock)->SetMockClockWallTimeForTests(time);
+    Timestamp now = clock.Now();
 
     // Clock should run strictly forwards.
     ASSERT_GT(now.value(), prev.value());
@@ -150,16 +149,31 @@ TEST(MockHybridClockTest, TestClockDealsWithWrapping) {
   }
 }
 
+class HybridClockTest : public ClockTest {
+ public:
+  HybridClockTest()
+      : clock_(metric_entity_) {
+  }
+
+  void SetUp() override {
+    KuduTest::SetUp();
+    ASSERT_OK(clock_.Init());
+  }
+
+ protected:
+  HybridClock clock_;
+};
+
 // Test that two subsequent time reads are monotonically increasing.
-TEST_F(HybridClockTest, TestNow_ValuesIncreaseMonotonically) {
-  const Timestamp now1 = clock_->Now();
-  const Timestamp now2 = clock_->Now();
+TEST_F(HybridClockTest, NowValuesIncreaseMonotonically) {
+  const Timestamp now1 = clock_.Now();
+  const Timestamp now2 = clock_.Now();
   ASSERT_LT(now1.value(), now2.value());
 }
 
 // Tests the clock updates with the incoming value if it is higher.
-TEST_F(HybridClockTest, TestUpdate_LogicalValueIncreasesByAmount) {
-  Timestamp now = clock_->Now();
+TEST_F(HybridClockTest, UpdateLogicalValueIncreasesByAmount) {
+  Timestamp now = clock_.Now();
   uint64_t now_micros = HybridClock::GetPhysicalValueMicros(now);
 
   // increase the logical value
@@ -170,31 +184,31 @@ TEST_F(HybridClockTest, 
TestUpdate_LogicalValueIncreasesByAmount) {
   // one, 200 msecs should be more than enough.
   now_micros += 200000;
 
-  Timestamp now_increased = 
HybridClock::TimestampFromMicrosecondsAndLogicalValue(now_micros,
-                                                                               
   logical);
+  auto now_increased = HybridClock::TimestampFromMicrosecondsAndLogicalValue(
+      now_micros, logical);
 
-  ASSERT_OK(clock_->Update(now_increased));
+  ASSERT_OK(clock_.Update(now_increased));
 
-  Timestamp now2 = clock_->Now();
+  Timestamp now2 = clock_.Now();
   ASSERT_EQ(logical + 1, HybridClock::GetLogicalValue(now2));
   ASSERT_EQ(HybridClock::GetPhysicalValueMicros(now) + 200000,
             HybridClock::GetPhysicalValueMicros(now2));
 }
 
 // Test that the incoming event is in the past, i.e. less than now - max_error
-TEST_F(HybridClockTest, TestWaitUntilAfter_TestCase1) {
+TEST_F(HybridClockTest, WaitUntilAfterCase1) {
   MonoTime no_deadline;
   MonoTime before = MonoTime::Now();
 
   Timestamp past_ts;
   uint64_t max_error;
-  clock_->NowWithError(&past_ts, &max_error);
+  clock_.NowWithError(&past_ts, &max_error);
 
   // make the event 3 * the max. possible error in the past
   Timestamp past_ts_changed = HybridClock::AddPhysicalTimeToTimestamp(
       past_ts,
       MonoDelta::FromMicroseconds(-3 * static_cast<int64_t>(max_error)));
-  ASSERT_OK(clock_->WaitUntilAfter(past_ts_changed, no_deadline));
+  ASSERT_OK(clock_.WaitUntilAfter(past_ts_changed, no_deadline));
 
   MonoTime after = MonoTime::Now();
   MonoDelta delta = after - before;
@@ -205,14 +219,14 @@ TEST_F(HybridClockTest, TestWaitUntilAfter_TestCase1) {
 
 // The normal case for transactions. Obtain a timestamp and then wait until
 // we're sure that tx_latest < now_earliest.
-TEST_F(HybridClockTest, TestWaitUntilAfter_TestCase2) {
+TEST_F(HybridClockTest, WaitUntilAfterCase2) {
   const MonoTime before = MonoTime::Now();
 
   // we do no time adjustment, this event should fall right within the possible
   // error interval
   Timestamp past_ts;
   uint64_t past_max_error;
-  clock_->NowWithError(&past_ts, &past_max_error);
+  clock_.NowWithError(&past_ts, &past_max_error);
   // Make sure the error is at least a small number of microseconds, to ensure
   // that we always have to wait.
   past_max_error = std::max(past_max_error, static_cast<uint64_t>(2000));
@@ -222,19 +236,19 @@ TEST_F(HybridClockTest, TestWaitUntilAfter_TestCase2) {
 
   Timestamp current_ts;
   uint64_t current_max_error;
-  clock_->NowWithError(&current_ts, &current_max_error);
+  clock_.NowWithError(&current_ts, &current_max_error);
 
   // Check waiting with a deadline which already expired.
   {
     MonoTime deadline = before;
-    Status s = clock_->WaitUntilAfter(wait_until, deadline);
+    Status s = clock_.WaitUntilAfter(wait_until, deadline);
     ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
   }
 
   // Wait with a deadline well in the future. This should succeed.
   {
     MonoTime deadline = before + MonoDelta::FromSeconds(60);
-    ASSERT_OK(clock_->WaitUntilAfter(wait_until, deadline));
+    ASSERT_OK(clock_.WaitUntilAfter(wait_until, deadline));
   }
 
   MonoTime after = MonoTime::Now();
@@ -251,19 +265,19 @@ TEST_F(HybridClockTest, TestWaitUntilAfter_TestCase2) {
 }
 
 TEST_F(HybridClockTest, TestIsAfter) {
-  Timestamp ts1 = clock_->Now();
-  ASSERT_TRUE(clock_->IsAfter(ts1));
+  Timestamp ts1 = clock_.Now();
+  ASSERT_TRUE(clock_.IsAfter(ts1));
 
   // Update the clock in the future, make sure it still
   // handles "IsAfter" properly even when it's running in
   // "logical" mode.
   Timestamp now_increased = HybridClock::TimestampFromMicroseconds(
     HybridClock::GetPhysicalValueMicros(ts1) + 1 * 1000 * 1000);
-  ASSERT_OK(clock_->Update(now_increased));
-  Timestamp ts2 = clock_->Now();
+  ASSERT_OK(clock_.Update(now_increased));
+  Timestamp ts2 = clock_.Now();
 
-  ASSERT_TRUE(clock_->IsAfter(ts1));
-  ASSERT_TRUE(clock_->IsAfter(ts2));
+  ASSERT_TRUE(clock_.IsAfter(ts1));
+  ASSERT_TRUE(clock_.IsAfter(ts2));
 }
 
 // Thread which loops polling the clock and updating it slightly
@@ -298,8 +312,7 @@ TEST_F(HybridClockTest, 
TestClockDoesntGoBackwardsWithUpdates) {
 
   for (int i = 0; i < 4; i++) {
     scoped_refptr<Thread> thread;
-    ASSERT_OK(Thread::Create("test", "stresser",
-                             &StresserThread, clock_.get(), &stop,
+    ASSERT_OK(Thread::Create("test", "stresser", &StresserThread, &clock_, 
&stop,
                              &thread));
     threads.push_back(thread);
   }
@@ -311,8 +324,8 @@ TEST_F(HybridClockTest, TestGetPhysicalComponentDifference) 
{
   Timestamp now1 = HybridClock::TimestampFromMicrosecondsAndLogicalValue(100, 
100);
   SleepFor(MonoDelta::FromMilliseconds(1));
   Timestamp now2 = HybridClock::TimestampFromMicrosecondsAndLogicalValue(200, 
0);
-  MonoDelta delta = clock_->GetPhysicalComponentDifference(now2, now1);
-  MonoDelta negative_delta = clock_->GetPhysicalComponentDifference(now1, 
now2);
+  MonoDelta delta = clock_.GetPhysicalComponentDifference(now2, now1);
+  MonoDelta negative_delta = clock_.GetPhysicalComponentDifference(now1, now2);
   ASSERT_EQ(100, delta.ToMicroseconds());
   ASSERT_EQ(-100, negative_delta.ToMicroseconds());
 }
@@ -322,31 +335,31 @@ TEST_F(HybridClockTest, TestRideOverNtpInterruption) {
   uint64_t max_error_usec[3];
 
   // Get the clock once, with a working NTP.
-  clock_->NowWithError(&timestamps[0], &max_error_usec[0]);
+  clock_.NowWithError(&timestamps[0], &max_error_usec[0]);
 
   // Try to read the clock again a second later, but with an error
   // injected. It should extrapolate from the first read.
   SleepFor(MonoDelta::FromSeconds(1));
   FLAGS_inject_unsync_time_errors = true;
-  clock_->NowWithError(&timestamps[1], &max_error_usec[1]);
+  clock_.NowWithError(&timestamps[1], &max_error_usec[1]);
 
   // The new clock reading should be a second or longer from the
   // first one, since SleepFor guarantees sleeping at least as long
   // as specified.
-  MonoDelta phys_diff = clock_->GetPhysicalComponentDifference(
+  MonoDelta phys_diff = clock_.GetPhysicalComponentDifference(
       timestamps[1], timestamps[0]);
   ASSERT_GE(phys_diff.ToSeconds(), 1);
 
   // The new clock reading should have higher error than the first.
   // The error should have increased based on the clock skew.
   int64_t error_diff = max_error_usec[1] - max_error_usec[0];
-  ASSERT_NEAR(error_diff, clock_->time_service()->skew_ppm() * 
phys_diff.ToSeconds(),
+  ASSERT_NEAR(error_diff, clock_.time_service()->skew_ppm() * 
phys_diff.ToSeconds(),
               10);
 
   // Now restore the ability to read the system clock, and
   // read it again.
   FLAGS_inject_unsync_time_errors = false;
-  clock_->NowWithError(&timestamps[2], &max_error_usec[2]);
+  clock_.NowWithError(&timestamps[2], &max_error_usec[2]);
 
   ASSERT_LT(timestamps[0].ToUint64(), timestamps[1].ToUint64());
   ASSERT_LT(timestamps[1].ToUint64(), timestamps[2].ToUint64());
@@ -355,11 +368,11 @@ TEST_F(HybridClockTest, TestRideOverNtpInterruption) {
 #if defined(KUDU_HAS_SYSTEM_TIME_SOURCE)
 TEST_F(HybridClockTest, TestNtpDiagnostics) {
   FLAGS_time_source = "system";
-  clock_.reset(new HybridClock);
-  ASSERT_OK(clock_->Init());
+  HybridClock clock(metric_entity_);
+  ASSERT_OK(clock.Init());
 
   vector<string> log;
-  clock_->time_service()->DumpDiagnostics(&log);
+  clock.time_service()->DumpDiagnostics(&log);
   string s = JoinStrings(log, "\n");
   SCOPED_TRACE(s);
   ASSERT_STR_MATCHES(s, "(ntp_gettime\\(\\) returns code |chronyc -n 
tracking)");
diff --git a/src/kudu/clock/hybrid_clock.cc b/src/kudu/clock/hybrid_clock.cc
index 459d605..2be46ec 100644
--- a/src/kudu/clock/hybrid_clock.cc
+++ b/src/kudu/clock/hybrid_clock.cc
@@ -155,10 +155,18 @@ const int HybridClock::kBitsToShift = 12;
 // This mask gives us back the logical bits.
 const uint64_t HybridClock::kLogicalBitMask = (1 << kBitsToShift) - 1;
 
-
-HybridClock::HybridClock()
+HybridClock::HybridClock(const scoped_refptr<MetricEntity>& metric_entity)
     : next_timestamp_(0),
       state_(kNotInitialized) {
+  DCHECK(metric_entity);
+  METRIC_hybrid_clock_timestamp.InstantiateFunctionGauge(
+      metric_entity,
+      Bind(&HybridClock::NowForMetrics, Unretained(this)))->
+          AutoDetachToLastValue(&metric_detacher_);
+  METRIC_hybrid_clock_error.InstantiateFunctionGauge(
+      metric_entity,
+      Bind(&HybridClock::ErrorForMetrics, Unretained(this)))->
+          AutoDetachToLastValue(&metric_detacher_);
 }
 
 Status HybridClock::Init() {
@@ -253,7 +261,6 @@ Status HybridClock::GetGlobalLatest(Timestamp* t) {
 }
 
 void HybridClock::NowWithError(Timestamp* timestamp, uint64_t* max_error_usec) 
{
-
   DCHECK_EQ(state_, kInitialized) << "Clock not initialized. Must call Init() 
first.";
 
   uint64_t now_usec;
@@ -515,17 +522,6 @@ uint64_t HybridClock::ErrorForMetrics() {
   return error;
 }
 
-void HybridClock::RegisterMetrics(const scoped_refptr<MetricEntity>& 
metric_entity) {
-  METRIC_hybrid_clock_timestamp.InstantiateFunctionGauge(
-      metric_entity,
-      Bind(&HybridClock::NowForMetrics, Unretained(this)))
-    ->AutoDetachToLastValue(&metric_detacher_);
-  METRIC_hybrid_clock_error.InstantiateFunctionGauge(
-      metric_entity,
-      Bind(&HybridClock::ErrorForMetrics, Unretained(this)))
-    ->AutoDetachToLastValue(&metric_detacher_);
-}
-
 string HybridClock::Stringify(Timestamp timestamp) {
   return StringifyTimestamp(timestamp);
 }
diff --git a/src/kudu/clock/hybrid_clock.h b/src/kudu/clock/hybrid_clock.h
index dbfa201..be855de 100644
--- a/src/kudu/clock/hybrid_clock.h
+++ b/src/kudu/clock/hybrid_clock.h
@@ -39,7 +39,9 @@ namespace clock {
 // since NTP clock error is not available.
 class HybridClock : public Clock {
  public:
-  HybridClock();
+  // Create an instance, registering HybridClock's metrics with the specified
+  // metric entity.
+  explicit HybridClock(const scoped_refptr<MetricEntity>& metric_entity);
 
   Status Init() override;
 
@@ -59,8 +61,6 @@ class HybridClock : public Clock {
   // Updates the clock with a timestamp originating on another machine.
   Status Update(const Timestamp& to_update) override;
 
-  void RegisterMetrics(const scoped_refptr<MetricEntity>& metric_entity) 
override;
-
   // HybridClock supports all external consistency modes.
   bool SupportsExternalConsistencyMode(ExternalConsistencyMode mode) override;
 
@@ -152,7 +152,7 @@ class HybridClock : public Clock {
   // separated.
   static std::string StringifyTimestamp(const Timestamp& timestamp);
 
-  clock::TimeService* time_service() {
+  clock::TimeService* time_service() const {
     return time_service_.get();
   }
 
@@ -176,7 +176,7 @@ class HybridClock : public Clock {
   // service.
   std::unique_ptr<clock::TimeService> time_service_;
 
-  mutable simple_spinlock lock_;
+  simple_spinlock lock_;
 
   // The next timestamp to be generated from this clock, assuming that
   // the physical clock hasn't advanced beyond the value stored here.
@@ -184,7 +184,7 @@ class HybridClock : public Clock {
 
   // The last valid clock reading we got from the time source, along
   // with the monotime that we took that reading.
-  mutable simple_spinlock last_clock_read_lock_;
+  simple_spinlock last_clock_read_lock_;
   MonoTime last_clock_read_time_;
   uint64_t last_clock_read_physical_;
   uint64_t last_clock_read_error_;
diff --git a/src/kudu/clock/logical_clock-test.cc 
b/src/kudu/clock/logical_clock-test.cc
index a6b5cb4..7ee06b9 100644
--- a/src/kudu/clock/logical_clock-test.cc
+++ b/src/kudu/clock/logical_clock-test.cc
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <memory>
-
 #include <gtest/gtest.h>
 
 #include "kudu/clock/logical_clock.h"
@@ -26,66 +24,64 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
-using std::unique_ptr;
-
 namespace kudu {
 namespace clock {
 
 class LogicalClockTest : public KuduTest {
  public:
   LogicalClockTest()
-      : clock_(LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)) {
+      : clock_(Timestamp::kInitialTimestamp) {
   }
 
  protected:
-  unique_ptr<LogicalClock> clock_;
+  LogicalClock clock_;
 };
 
 // Test that two subsequent time reads are monotonically increasing.
-TEST_F(LogicalClockTest, TestNow_ValuesIncreaseMonotonically) {
-  const Timestamp now1 = clock_->Now();
-  const Timestamp now2 = clock_->Now();
+TEST_F(LogicalClockTest, NowValuesIncreaseMonotonically) {
+  const Timestamp now1 = clock_.Now();
+  const Timestamp now2 = clock_.Now();
   ASSERT_EQ(now1.value() + 1, now2.value());
 }
 
 // Tests that the clock gets updated if the incoming value is higher.
-TEST_F(LogicalClockTest, TestUpdate_LogicalValueIncreasesByAmount) {
-  Timestamp initial = clock_->Now();
+TEST_F(LogicalClockTest, UpdateLogicalValueIncreasesByAmount) {
+  Timestamp initial = clock_.Now();
   Timestamp future(initial.value() + 10);
-  clock_->Update(future);
-  Timestamp now = clock_->Now();
+  clock_.Update(future);
+  Timestamp now = clock_.Now();
   // now should be 1 after future
   ASSERT_EQ(initial.value() + 11, now.value());
 }
 
 // Tests that the clock doesn't get updated if the incoming value is lower.
-TEST_F(LogicalClockTest, TestUpdate_LogicalValueDoesNotIncrease) {
+TEST_F(LogicalClockTest, UpdateLogicalValueDoesNotIncrease) {
   Timestamp ts(1);
   // update the clock to 1, the initial value, should do nothing
-  clock_->Update(ts);
-  Timestamp now = clock_->Now();
-  ASSERT_EQ(now.value(), 2);
+  clock_.Update(ts);
+  Timestamp now = clock_.Now();
+  ASSERT_EQ(2, now.value());
 }
 
 TEST_F(LogicalClockTest, TestWaitUntilAfterIsUnavailable) {
-  Status status = clock_->WaitUntilAfter(
+  Status status = clock_.WaitUntilAfter(
       Timestamp(10), MonoTime::Now());
   ASSERT_TRUE(status.IsServiceUnavailable());
 }
 
 TEST_F(LogicalClockTest, TestIsAfter) {
-  Timestamp ts1 = clock_->Now();
-  ASSERT_TRUE(clock_->IsAfter(ts1));
+  Timestamp ts1 = clock_.Now();
+  ASSERT_TRUE(clock_.IsAfter(ts1));
 
   // Update the clock in the future, make sure it still
   // handles "IsAfter" properly even when it's running in
   // "logical" mode.
   Timestamp now_increased = Timestamp(1000);
-  ASSERT_OK(clock_->Update(now_increased));
-  Timestamp ts2 = clock_->Now();
+  ASSERT_OK(clock_.Update(now_increased));
+  Timestamp ts2 = clock_.Now();
 
-  ASSERT_TRUE(clock_->IsAfter(ts1));
-  ASSERT_TRUE(clock_->IsAfter(ts2));
+  ASSERT_TRUE(clock_.IsAfter(ts1));
+  ASSERT_TRUE(clock_.IsAfter(ts2));
 }
 
 }  // namespace clock
diff --git a/src/kudu/clock/logical_clock.cc b/src/kudu/clock/logical_clock.cc
index 6fa861e..3403e01 100644
--- a/src/kudu/clock/logical_clock.cc
+++ b/src/kudu/clock/logical_clock.cc
@@ -46,6 +46,17 @@ using std::unique_ptr;
 namespace kudu {
 namespace clock {
 
+LogicalClock::LogicalClock(const Timestamp& timestamp,
+                           const scoped_refptr<MetricEntity>& metric_entity)
+    : now_(timestamp.value() - 1) {
+  if (metric_entity) {
+    METRIC_logical_clock_timestamp.InstantiateFunctionGauge(
+        metric_entity,
+        Bind(&LogicalClock::GetCurrentTime, Unretained(this)))->
+            AutoDetachToLastValue(&metric_detacher_);
+  }
+}
+
 Timestamp LogicalClock::Now() {
   return Timestamp(Barrier_AtomicIncrement(&now_, 1));
 }
@@ -90,23 +101,11 @@ bool LogicalClock::IsAfter(Timestamp t) {
   return base::subtle::Acquire_Load(&now_) >= t.value();
 }
 
-unique_ptr<LogicalClock> LogicalClock::CreateStartingAt(const Timestamp& 
timestamp) {
-  // initialize at 'timestamp' - 1 so that the  first output value is 
'timestamp'.
-  return unique_ptr<LogicalClock>(new LogicalClock(timestamp.value() - 1));
-}
-
 uint64_t LogicalClock::GetCurrentTime() {
   // We don't want reading metrics to change the clock.
   return NoBarrier_Load(&now_);
 }
 
-void LogicalClock::RegisterMetrics(const scoped_refptr<MetricEntity>& 
metric_entity) {
-  METRIC_logical_clock_timestamp.InstantiateFunctionGauge(
-      metric_entity,
-      Bind(&LogicalClock::GetCurrentTime, Unretained(this)))
-    ->AutoDetachToLastValue(&metric_detacher_);
-}
-
 std::string LogicalClock::Stringify(Timestamp timestamp) {
   return strings::Substitute("L: $0", timestamp.ToUint64());
 }
diff --git a/src/kudu/clock/logical_clock.h b/src/kudu/clock/logical_clock.h
index 0a74a51..bd92945 100644
--- a/src/kudu/clock/logical_clock.h
+++ b/src/kudu/clock/logical_clock.h
@@ -17,7 +17,6 @@
 #pragma once
 
 #include <cstdint>
-#include <memory>
 #include <string>
 
 #include "kudu/clock/clock.h"
@@ -47,6 +46,9 @@ namespace clock {
 // NOTE: this class is thread safe.
 class LogicalClock : public Clock {
  public:
+  // Create logical clock starting with the given timestamp.
+  explicit LogicalClock(const Timestamp& timestamp,
+                        const scoped_refptr<MetricEntity>& metric_entity = {});
 
   Status Init() override { return Status::OK(); }
 
@@ -65,8 +67,6 @@ class LogicalClock : public Clock {
 
   bool IsAfter(Timestamp t) override;
 
-  void RegisterMetrics(const scoped_refptr<MetricEntity>& metric_entity) 
override;
-
   std::string Stringify(Timestamp timestamp) override;
 
   // Used to get the timestamp without incrementing the logical component.
@@ -78,13 +78,7 @@ class LogicalClock : public Clock {
     return mode != COMMIT_WAIT;
   }
 
-  // Creates a logical clock whose first output value on a Now() call is 
'timestamp'.
-  static std::unique_ptr<LogicalClock> CreateStartingAt(const Timestamp& 
timestamp);
-
  private:
-  // Should use LogicalClock::CreatingStartingAt()
-  explicit LogicalClock(Timestamp::val_type initial_time) : now_(initial_time) 
{}
-
   base::subtle::Atomic64 now_;
 
   FunctionGaugeDetacher metric_detacher_;
diff --git a/src/kudu/consensus/consensus_peers-test.cc 
b/src/kudu/consensus/consensus_peers-test.cc
index d3afb92..4a73948 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -54,18 +54,19 @@
 #include "kudu/util/threadpool.h"
 
 METRIC_DECLARE_entity(tablet);
+METRIC_DECLARE_entity(server);
 
-namespace kudu {
-namespace consensus {
-
-using log::Log;
-using log::LogOptions;
-using rpc::Messenger;
-using rpc::MessengerBuilder;
+using kudu::log::Log;
+using kudu::log::LogOptions;
+using kudu::rpc::Messenger;
+using kudu::rpc::MessengerBuilder;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 
+namespace kudu {
+namespace consensus {
+
 const char* kTabletId = "test-peers-tablet";
 const char* kLeaderUuid = "peer-0";
 const char* kFollowerUuid = "peer-1";
@@ -73,8 +74,11 @@ const char* kFollowerUuid = "peer-1";
 class ConsensusPeersTest : public KuduTest {
  public:
   ConsensusPeersTest()
-    : metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, 
"peer-test")),
-      schema_(GetSimpleTestSchema()) {
+      : metric_entity_server_(METRIC_ENTITY_server.Instantiate(
+            &metric_registry_, "consensus-peer-test::server")),
+        metric_entity_tablet_(METRIC_ENTITY_tablet.Instantiate(
+            &metric_registry_, "consensus-peer-test::tablet")),
+        schema_(GetSimpleTestSchema()) {
     CHECK_OK(ThreadPoolBuilder("test-raft-pool").Build(&raft_pool_));
     raft_pool_token_ = 
raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
   }
@@ -92,13 +96,13 @@ class ConsensusPeersTest : public KuduTest {
                         0, // schema_version
                         /*metric_entity*/nullptr,
                         &log_));
-    clock_.reset(new clock::HybridClock());
+    clock_.reset(new clock::HybridClock(metric_entity_server_));
     ASSERT_OK(clock_->Init());
 
     time_manager_.reset(new TimeManager(clock_.get(), Timestamp::kMin));
 
     message_queue_.reset(new PeerMessageQueue(
-        metric_entity_,
+        metric_entity_tablet_,
         log_.get(),
         time_manager_.get(),
         FakeRaftPeerPB(kLeaderUuid),
@@ -164,7 +168,8 @@ class ConsensusPeersTest : public KuduTest {
 
  protected:
   MetricRegistry metric_registry_;
-  scoped_refptr<MetricEntity> metric_entity_;
+  scoped_refptr<MetricEntity> metric_entity_server_;
+  scoped_refptr<MetricEntity> metric_entity_tablet_;
   unique_ptr<FsManager> fs_manager_;
   scoped_refptr<Log> log_;
   unique_ptr<ThreadPool> raft_pool_;
diff --git a/src/kudu/consensus/consensus_queue-test.cc 
b/src/kudu/consensus/consensus_queue-test.cc
index 572fe31..b77c8db 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -84,7 +84,10 @@ class ConsensusQueueTest : public KuduTest {
  public:
   ConsensusQueueTest()
       : schema_(GetSimpleTestSchema()),
-        metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, 
"queue-test")),
+        metric_entity_server_(METRIC_ENTITY_server.Instantiate(
+            &metric_registry_, "consensus-queue-test::server")),
+        metric_entity_tablet_(METRIC_ENTITY_tablet.Instantiate(
+            &metric_registry_, "consensus-queue-test::tablet")),
         registry_(new log::LogAnchorRegistry),
         quiescing_(false) {
   }
@@ -102,7 +105,7 @@ class ConsensusQueueTest : public KuduTest {
                             /*schema_version*/0,
                             /*metric_entity*/nullptr,
                             &log_));
-    clock_.reset(new clock::HybridClock());
+    clock_.reset(new clock::HybridClock(metric_entity_server_));
     ASSERT_OK(clock_->Init());
 
     ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
@@ -112,7 +115,7 @@ class ConsensusQueueTest : public KuduTest {
 
   void CloseAndReopenQueue(const OpId& replicated_opid, const OpId& 
committed_opid) {
     queue_.reset(new PeerMessageQueue(
-        metric_entity_,
+        metric_entity_tablet_,
         log_.get(),
         time_manager_.get(),
         FakeRaftPeerPB(kLeaderUuid),
@@ -234,9 +237,10 @@ class ConsensusQueueTest : public KuduTest {
 
  protected:
   const Schema schema_;
-  gscoped_ptr<FsManager> fs_manager_;
   MetricRegistry metric_registry_;
-  scoped_refptr<MetricEntity> metric_entity_;
+  scoped_refptr<MetricEntity> metric_entity_server_;
+  scoped_refptr<MetricEntity> metric_entity_tablet_;
+  gscoped_ptr<FsManager> fs_manager_;
   scoped_refptr<log::Log> log_;
   unique_ptr<ThreadPool> raft_pool_;
   unique_ptr<TimeManager> time_manager_;
diff --git a/src/kudu/consensus/log-test-base.h 
b/src/kudu/consensus/log-test-base.h
index 345492a..1ae43ae 100644
--- a/src/kudu/consensus/log-test-base.h
+++ b/src/kudu/consensus/log-test-base.h
@@ -156,7 +156,7 @@ class LogTestBase : public KuduTest {
     KuduTest::SetUp();
     current_index_ = kStartIndex;
     fs_manager_.reset(new FsManager(env_, 
FsManagerOpts(GetTestPath("fs_root"))));
-    metric_registry_.reset(new MetricRegistry());
+    metric_registry_.reset(new MetricRegistry);
     metric_entity_tablet_ = METRIC_ENTITY_tablet.Instantiate(
         metric_registry_.get(), "tablet");
     metric_entity_server_ = METRIC_ENTITY_server.Instantiate(
@@ -168,7 +168,7 @@ class LogTestBase : public KuduTest {
     ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
     ASSERT_OK(fs_manager_->Open());
 
-    clock_.reset(new clock::HybridClock());
+    clock_.reset(new clock::HybridClock(metric_entity_server_));
     ASSERT_OK(clock_->Init());
   }
 
diff --git a/src/kudu/consensus/log_cache-test.cc 
b/src/kudu/consensus/log_cache-test.cc
index 8ebc97a..7057555 100644
--- a/src/kudu/consensus/log_cache-test.cc
+++ b/src/kudu/consensus/log_cache-test.cc
@@ -66,6 +66,7 @@ using strings::Substitute;
 DECLARE_int32(log_cache_size_limit_mb);
 DECLARE_int32(global_log_cache_size_limit_mb);
 
+METRIC_DECLARE_entity(server);
 METRIC_DECLARE_entity(tablet);
 
 namespace kudu {
@@ -77,8 +78,11 @@ static const char* kTestTablet = "test-tablet";
 class LogCacheTest : public KuduTest {
  public:
   LogCacheTest()
-    : schema_(GetSimpleTestSchema()),
-      metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, 
"LogCacheTest")) {
+      : schema_(GetSimpleTestSchema()),
+        metric_entity_server_(METRIC_ENTITY_server.Instantiate(
+            &metric_registry_, "LogCacheTest::server")),
+        metric_entity_tablet_(METRIC_ENTITY_tablet.Instantiate(
+            &metric_registry_, "LogCacheTest::tablet")) {
   }
 
   virtual void SetUp() OVERRIDE {
@@ -96,7 +100,7 @@ class LogCacheTest : public KuduTest {
                             &log_));
 
     CloseAndReopenCache(MinimumOpId());
-    clock_.reset(new clock::HybridClock());
+    clock_.reset(new clock::HybridClock(metric_entity_server_));
     ASSERT_OK(clock_->Init());
   }
 
@@ -105,7 +109,7 @@ class LogCacheTest : public KuduTest {
   }
 
   void CloseAndReopenCache(const OpId& preceding_id) {
-    cache_.reset(new LogCache(metric_entity_,
+    cache_.reset(new LogCache(metric_entity_tablet_,
                               log_.get(),
                               kPeerUuid,
                               kTestTablet));
@@ -133,7 +137,8 @@ class LogCacheTest : public KuduTest {
 
   const Schema schema_;
   MetricRegistry metric_registry_;
-  scoped_refptr<MetricEntity> metric_entity_;
+  scoped_refptr<MetricEntity> metric_entity_server_;
+  scoped_refptr<MetricEntity> metric_entity_tablet_;
   unique_ptr<FsManager> fs_manager_;
   unique_ptr<LogCache> cache_;
   scoped_refptr<log::Log> log_;
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc 
b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 3a60f68..2fb196b 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -31,7 +31,6 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-#include "kudu/clock/clock.h"
 #include "kudu/clock/logical_clock.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/schema.h"
@@ -119,9 +118,9 @@ class RaftConsensusQuorumTest : public KuduTest {
   typedef vector<unique_ptr<LogEntryPB>> LogEntries;
 
   RaftConsensusQuorumTest()
-    : clock_(clock::LogicalClock::CreateStartingAt(Timestamp(1))),
-      metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, 
"raft-test")),
-      schema_(GetSimpleTestSchema()) {
+      : clock_(Timestamp(1)),
+        metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, 
"raft-test")),
+        schema_(GetSimpleTestSchema()) {
     options_.tablet_id = kTestTablet;
     FLAGS_enable_leader_failure_detection = false;
     CHECK_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
@@ -217,7 +216,7 @@ class RaftConsensusQuorumTest : public KuduTest {
       unique_ptr<PeerProxyFactory> proxy_factory(
           new LocalTestPeerProxyFactory(peers_.get()));
       unique_ptr<TimeManager> time_manager(
-          new TimeManager(clock_.get(), Timestamp::kMin));
+          new TimeManager(&clock_, Timestamp::kMin));
       unique_ptr<TestTransactionFactory> txn_factory(
           new TestTransactionFactory(logs_[i].get()));
       txn_factory->SetConsensus(peer.get());
@@ -274,7 +273,7 @@ class RaftConsensusQuorumTest : public KuduTest {
     gscoped_ptr<ReplicateMsg> msg(new ReplicateMsg());
     msg->set_op_type(NO_OP);
     msg->mutable_noop_request();
-    msg->set_timestamp(clock_->Now().ToUint64());
+    msg->set_timestamp(clock_.Now().ToUint64());
 
     shared_ptr<RaftConsensus> peer;
     CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer));
@@ -584,7 +583,7 @@ class RaftConsensusQuorumTest : public KuduTest {
   unique_ptr<ThreadPool> raft_pool_;
   unique_ptr<TestPeerMapManager> peers_;
   vector<unique_ptr<TestTransactionFactory>> txn_factories_;
-  unique_ptr<clock::Clock> clock_;
+  clock::LogicalClock clock_;
   MetricRegistry metric_registry_;
   scoped_refptr<MetricEntity> metric_entity_;
   const Schema schema_;
@@ -962,7 +961,7 @@ TEST_F(RaftConsensusQuorumTest, 
TestReplicasEnforceTheLogMatchingProperty) {
 
   // Send a request with the next index.
   ReplicateMsg* replicate = req.add_ops();
-  replicate->set_timestamp(clock_->Now().ToUint64());
+  replicate->set_timestamp(clock_.Now().ToUint64());
   OpId* id = replicate->mutable_id();
   id->set_term(last_op_id.term());
   id->set_index(last_op_id.index() + 1);
diff --git a/src/kudu/consensus/time_manager-test.cc 
b/src/kudu/consensus/time_manager-test.cc
index c1f37d1..bd02933 100644
--- a/src/kudu/consensus/time_manager-test.cc
+++ b/src/kudu/consensus/time_manager-test.cc
@@ -27,7 +27,9 @@
 #include "kudu/clock/hybrid_clock.h"
 #include "kudu/common/timestamp.h"
 #include "kudu/consensus/consensus.pb.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/util/countdown_latch.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -37,15 +39,21 @@ using std::thread;
 using std::unique_ptr;
 using std::vector;
 
+METRIC_DECLARE_entity(server);
+
 namespace kudu {
 namespace consensus {
 
 class TimeManagerTest : public KuduTest {
  public:
-  TimeManagerTest() : clock_(new clock::HybridClock) {}
+  TimeManagerTest()
+      : metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
+                                                        "time-manager-test")),
+        clock_(metric_entity_) {
+  }
 
   void SetUp() override {
-    CHECK_OK(clock_->Init());
+    CHECK_OK(clock_.Init());
   }
 
   void TearDown() override {
@@ -56,7 +64,7 @@ class TimeManagerTest : public KuduTest {
 
  protected:
   void InitTimeManager(Timestamp initial_safe_time = Timestamp::kMin) {
-    time_manager_.reset(new TimeManager(clock_.get(), initial_safe_time));
+    time_manager_.reset(new TimeManager(&clock_, initial_safe_time));
   }
 
   // Returns a latch that allows to wait for TimeManager to consider 
'safe_time' safe.
@@ -72,7 +80,9 @@ class TimeManagerTest : public KuduTest {
     return latch;
   }
 
-  unique_ptr<clock::HybridClock> clock_;
+  MetricRegistry metric_registry_;
+  scoped_refptr<MetricEntity> metric_entity_;
+  clock::HybridClock clock_;
   unique_ptr<TimeManager> time_manager_;
   vector<unique_ptr<CountDownLatch>> latches_;
   vector<thread> threads_;
@@ -81,7 +91,7 @@ class TimeManagerTest : public KuduTest {
 // Tests TimeManager's functionality in non-leader mode and the transition to 
leader mode.
 TEST_F(TimeManagerTest, TestTimeManagerNonLeaderMode) {
   // TimeManager should start in non-leader mode and consider the initial 
timestamp safe.
-  Timestamp before = clock_->Now();
+  Timestamp before = clock_.Now();
   Timestamp init(before.value() + 1);
   Timestamp after(init.value() + 1);
   InitTimeManager(init);
@@ -129,14 +139,14 @@ TEST_F(TimeManagerTest, TestTimeManagerNonLeaderMode) {
 
   // Advance 'after' again and test advancing safe time with an explicit 
timestamp like
   // the leader sends on (empty) heartbeat messages.
-  after = clock_->Now();
+  after = clock_.Now();
   after_latch = WaitForSafeTimeAsync(after);
   time_manager_->AdvanceSafeTime(after);
   after_latch->Wait();
   ASSERT_EQ(time_manager_->GetSafeTime(), after);
 
   // Changing to leader mode should advance safe time.
-  after = clock_->Now();
+  after = clock_.Now();
   after_latch = WaitForSafeTimeAsync(after);
   time_manager_->SetLeaderMode();
   after_latch->Wait();
@@ -145,7 +155,7 @@ TEST_F(TimeManagerTest, TestTimeManagerNonLeaderMode) {
 
 // Tests the TimeManager's functionality in leader mode and the transition to 
non-leader mode.
 TEST_F(TimeManagerTest, TestTimeManagerLeaderMode) {
-  Timestamp init = clock_->Now();
+  Timestamp init = clock_.Now();
   InitTimeManager(init);
   time_manager_->SetLeaderMode();
   Timestamp safe_before = time_manager_->GetSafeTime();
@@ -165,7 +175,7 @@ TEST_F(TimeManagerTest, TestTimeManagerLeaderMode) {
 
   // .. as should AdvanceSafeTime()
   EXPECT_DEATH({
-     time_manager_->AdvanceSafeTime(clock_->Now());
+     time_manager_->AdvanceSafeTime(clock_.Now());
     }, "Cannot advance safe time by timestamp in leader mode.");
 
   // Since we haven't appended the message to the queue, safe time should be 
'pinned' to
@@ -177,13 +187,13 @@ TEST_F(TimeManagerTest, TestTimeManagerLeaderMode) {
   ASSERT_GT(time_manager_->GetSafeTime(), message_ts);
 
   // 'Now' should be safe.
-  Timestamp now = clock_->Now();
+  Timestamp now = clock_.Now();
   ASSERT_TRUE(time_manager_->IsTimestampSafe(now));
   ASSERT_GT(time_manager_->GetSafeTime(), now);
 
   // When changing to non-leader mode a timestamp after the last safe time 
shouldn't be
   // safe anymore (even if that time came before the actual change).
-  now = clock_->Now();
+  now = clock_.Now();
   time_manager_->SetNonLeaderMode();
   Timestamp safe_after = time_manager_->GetSafeTime();
   ASSERT_LE(safe_after, now);
@@ -191,13 +201,13 @@ TEST_F(TimeManagerTest, TestTimeManagerLeaderMode) {
   // In leader mode GetSafeTime() usually moves it, but since we changed to 
non-leader mode
   // safe time shouldn't move anymore ...
   ASSERT_EQ(time_manager_->GetSafeTime(), safe_after);
-  now = clock_->Now();
+  now = clock_.Now();
   MonoTime after_small = MonoTime::Now();
   after_small.AddDelta(MonoDelta::FromMilliseconds(100));
   ASSERT_TRUE(time_manager_->WaitUntilSafe(now, after_small).IsTimedOut());
 
   // ... unless we get a message from the leader.
-  now = clock_->Now();
+  now = clock_.Now();
   CountDownLatch* after_latch = WaitForSafeTimeAsync(now);
   message.set_timestamp(now.value());
   ASSERT_OK(time_manager_->MessageReceivedFromLeader(message));
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc 
b/src/kudu/integration-tests/ts_recovery-itest.cc
index 04541ac..f0f4e7e 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -33,7 +33,6 @@
 #include "kudu/client/client.h"
 #include "kudu/client/shared_ptr.h"
 #include "kudu/client/write_op.h"
-#include "kudu/clock/clock.h"
 #include "kudu/clock/hybrid_clock.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/schema.h"
@@ -88,7 +87,6 @@ using kudu::client::KuduUpdate;
 using kudu::client::sp::shared_ptr;
 using kudu::cluster::ExternalTabletServer;
 using kudu::cluster::ExternalMiniClusterOptions;
-using kudu::clock::Clock;
 using kudu::clock::HybridClock;
 using kudu::consensus::ConsensusMetadata;
 using kudu::consensus::ConsensusMetadataManager;
@@ -590,8 +588,11 @@ TEST_P(TsRecoveryITestDeathTest, 
TestRecoverFromOpIdOverflow) {
     ASSERT_OK(fs_manager->Open());
     scoped_refptr<ConsensusMetadataManager> cmeta_manager(
         new ConsensusMetadataManager(fs_manager.get()));
-    unique_ptr<Clock> clock(new HybridClock);
-    ASSERT_OK(clock->Init());
+    MetricRegistry metric_registry;
+    auto metric_entity(METRIC_ENTITY_server.Instantiate(&metric_registry,
+                                                        "ts-recoverty-itest"));
+    HybridClock clock(metric_entity);
+    ASSERT_OK(clock.Init());
 
     OpId opid;
     opid.set_term(kOverflowedIndexValue);
@@ -610,7 +611,7 @@ TEST_P(TsRecoveryITestDeathTest, 
TestRecoverFromOpIdOverflow) {
 
       // Write a series of negative OpIds.
       // This will cause a crash, but only after they have been written to 
disk.
-      ASSERT_OK(AppendNoOpsToLogSync(clock.get(), log.get(), &opid, 
kNumOverflowedEntriesToWrite));
+      ASSERT_OK(AppendNoOpsToLogSync(&clock, log.get(), &opid, 
kNumOverflowedEntriesToWrite));
     }, "Check failed: log_index > 0");
 
     // Before restarting the tablet server, delete the initial log segment from
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 74be49c..a5bf5ad 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -433,9 +433,10 @@ ServerBase::ServerBase(string name, const 
ServerBaseOptions& options,
   fs_manager_.reset(new FsManager(options.env, std::move(fs_opts)));
 
   if (FLAGS_use_hybrid_clock) {
-    clock_.reset(new clock::HybridClock);
+    clock_.reset(new clock::HybridClock(metric_entity_));
   } else {
-    clock_ = 
clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp);
+    clock_.reset(new clock::LogicalClock(Timestamp::kInitialTimestamp,
+                                         metric_entity_));
   }
 
   if (FLAGS_webserver_enabled) {
@@ -544,7 +545,6 @@ Status ServerBase::Init() {
 
   RETURN_NOT_OK(rpc_server_->Init(messenger_));
   RETURN_NOT_OK(rpc_server_->Bind());
-  clock_->RegisterMetrics(metric_entity_);
 
   RETURN_NOT_OK_PREPEND(StartMetricsLogging(), "Could not enable metrics 
logging");
 
diff --git a/src/kudu/tablet/compaction-test.cc 
b/src/kudu/tablet/compaction-test.cc
index 65144e8..974fa18 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -109,12 +109,12 @@ static const size_t kSmallRollThreshold = 1024; // 1KB
 class TestCompaction : public KuduRowSetTest {
  public:
   TestCompaction()
-    : KuduRowSetTest(CreateSchema()),
-      op_id_(consensus::MaximumOpId()),
-      row_builder_(&schema_),
-      arena_(32*1024),
-      
clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)),
-      log_anchor_registry_(new log::LogAnchorRegistry()) {
+      : KuduRowSetTest(CreateSchema()),
+        op_id_(consensus::MaximumOpId()),
+        row_builder_(&schema_),
+        arena_(32*1024),
+        clock_(Timestamp::kInitialTimestamp),
+        log_anchor_registry_(new log::LogAnchorRegistry()) {
   }
 
   static Schema CreateSchema() {
@@ -137,7 +137,7 @@ class TestCompaction : public KuduRowSetTest {
   // The 'nullable_val' column is set to either NULL (when val is odd)
   // or 'val' (when val is even).
   void InsertRow(MemRowSet *mrs, int row_key, int32_t val) {
-    ScopedTransaction tx(&mvcc_, clock_->Now());
+    ScopedTransaction tx(&mvcc_, clock_.Now());
     tx.StartApplying();
     InsertRowInTransaction(mrs, tx, row_key, val);
     tx.Commit();
@@ -188,7 +188,7 @@ class TestCompaction : public KuduRowSetTest {
   }
 
   void UpdateRow(RowSet *rowset, int row_key, int32_t new_val) {
-    ScopedTransaction tx(&mvcc_, clock_->Now());
+    ScopedTransaction tx(&mvcc_, clock_.Now());
     tx.StartApplying();
     UpdateRowInTransaction(rowset, tx, row_key, new_val);
     tx.Commit();
@@ -243,7 +243,7 @@ class TestCompaction : public KuduRowSetTest {
   }
 
   void DeleteRow(RowSet* rowset, int row_key) {
-    ScopedTransaction tx(&mvcc_, clock_->Now());
+    ScopedTransaction tx(&mvcc_, clock_.Now());
     tx.StartApplying();
     DeleteRowInTransaction(rowset, tx, row_key);
     tx.Commit();
@@ -484,7 +484,7 @@ class TestCompaction : public KuduRowSetTest {
   RowBuilder row_builder_;
   char key_buf_[256];
   Arena arena_;
-  unique_ptr<clock::LogicalClock> clock_;
+  clock::LogicalClock clock_;
   MvccManager mvcc_;
 
   scoped_refptr<LogAnchorRegistry> log_anchor_registry_;
@@ -663,7 +663,7 @@ void TestCompaction::AddExpectedDelete(Mutation** 
current_head, Timestamp ts) {
   faststring buf;
   RowChangeListEncoder enc(&buf);
   enc.SetToDelete();
-  if (ts == Timestamp::kInvalidTimestamp) ts = 
Timestamp(clock_->GetCurrentTime());
+  if (ts == Timestamp::kInvalidTimestamp) ts = 
Timestamp(clock_.GetCurrentTime());
   Mutation* mutation = Mutation::CreateInArena(&arena_,
                                                ts,
                                                enc.as_changelist());
@@ -682,7 +682,7 @@ void TestCompaction::AddExpectedUpdate(Mutation** 
current_head, int32_t val) {
     enc.AddColumnUpdate(schema_.column(2), schema_.column_id(2), nullptr);
   }
   Mutation* mutation = Mutation::CreateInArena(&arena_,
-                                               
Timestamp(clock_->GetCurrentTime()),
+                                               
Timestamp(clock_.GetCurrentTime()),
                                                enc.as_changelist());
   mutation->set_next(*current_head);
   *current_head = mutation;
@@ -698,7 +698,8 @@ void TestCompaction::AddExpectedReinsert(Mutation** 
current_head, int32_t val) {
   } else {
     enc.EncodeColumnMutation(schema_.column(2), schema_.column_id(2), nullptr);
   }
-  Mutation* mutation = Mutation::CreateInArena(&arena_, 
Timestamp(clock_->GetCurrentTime()),
+  Mutation* mutation = Mutation::CreateInArena(&arena_,
+                                               
Timestamp(clock_.GetCurrentTime()),
                                                enc.as_changelist());
   mutation->set_next(*current_head);
   *current_head = mutation;
@@ -857,7 +858,7 @@ TEST_F(TestCompaction, 
TestMRSCompactionDoesntOutputUnobservableRows) {
     shared_ptr<MemRowSet> mrs;
     ASSERT_OK(MemRowSet::Create(1, schema_, log_anchor_registry_.get(),
                                 mem_trackers_.tablet_tracker, &mrs));
-    ScopedTransaction tx(&mvcc_, clock_->Now());
+    ScopedTransaction tx(&mvcc_, clock_.Now());
     tx.StartApplying();
     DeleteRowInTransaction(rs1.get(), tx, 1);
     InsertRowInTransaction(mrs.get(), tx, 1, 2);
diff --git a/src/kudu/tablet/deltamemstore-test.cc 
b/src/kudu/tablet/deltamemstore-test.cc
index 09abc9a..4a37c39 100644
--- a/src/kudu/tablet/deltamemstore-test.cc
+++ b/src/kudu/tablet/deltamemstore-test.cc
@@ -34,7 +34,6 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-#include "kudu/clock/clock.h"
 #include "kudu/clock/logical_clock.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/common/common.pb.h"
@@ -92,9 +91,9 @@ using fs::WritableBlock;
 class TestDeltaMemStore : public KuduTest {
  public:
   TestDeltaMemStore()
-    : op_id_(consensus::MaximumOpId()),
-      schema_(CreateSchema()),
-      
clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)) {
+      : op_id_(consensus::MaximumOpId()),
+        schema_(CreateSchema()),
+        clock_(Timestamp::kInitialTimestamp) {
     CHECK_OK(DeltaMemStore::Create(0, 0,
                                    new log::LogAnchorRegistry(),
                                    MemTracker::GetRootTracker(), &dms_));
@@ -115,7 +114,7 @@ class TestDeltaMemStore : public KuduTest {
     RowChangeListEncoder update(&buf);
 
     for (uint32_t idx_to_update : indexes_to_update) {
-      ScopedTransaction tx(&mvcc_, clock_->Now());
+      ScopedTransaction tx(&mvcc_, clock_.Now());
       tx.StartApplying();
       update.Reset();
       uint32_t new_val = idx_to_update * 10;
@@ -161,7 +160,7 @@ class TestDeltaMemStore : public KuduTest {
 
   const Schema schema_;
   shared_ptr<DeltaMemStore> dms_;
-  unique_ptr<clock::Clock> clock_;
+  clock::LogicalClock clock_;
   MvccManager mvcc_;
 };
 
@@ -193,7 +192,7 @@ TEST_F(TestDeltaMemStore, TestUpdateCount) {
                              schema_.column_id(kStringColumn), &s);
     }
     if (idx % 2 == 0) {
-      ScopedTransaction tx(&mvcc_, clock_->Now());
+      ScopedTransaction tx(&mvcc_, clock_.Now());
       tx.StartApplying();
       uint32_t new_val = idx * 10;
       update.AddColumnUpdate(schema_.column(kIntColumn),
@@ -267,7 +266,7 @@ TEST_F(TestDeltaMemStore, BenchmarkManyUpdatesToOneRow) {
     faststring buf;
     RowChangeListEncoder update(&buf);
 
-    ScopedTransaction tx(&mvcc_, clock_->Now());
+    ScopedTransaction tx(&mvcc_, clock_.Now());
     tx.StartApplying();
     string str(kStringDataSize, 'x');
     Slice s(str);
@@ -276,7 +275,7 @@ TEST_F(TestDeltaMemStore, BenchmarkManyUpdatesToOneRow) {
     CHECK_OK(dms_->Update(tx.timestamp(), kIdxToUpdate, RowChangeList(buf), 
op_id_));
     tx.Commit();
   }
-  mvcc_.AdjustSafeTime(clock_->Now());
+  mvcc_.AdjustSafeTime(clock_.Now());
 
   MvccSnapshot snap(mvcc_);
   LOG_TIMING(INFO, "Applying updates") {
@@ -390,7 +389,7 @@ TEST_F(TestDeltaMemStore, TestReUpdateSlice) {
   // the update gets cleared after usage. This ensures that the
   // underlying data is properly copied into the DMS arena.
   {
-    ScopedTransaction tx(&mvcc_, clock_->Now());
+    ScopedTransaction tx(&mvcc_, clock_.Now());
     tx.StartApplying();
     char buf[256] = "update 1";
     Slice s(buf);
@@ -404,7 +403,7 @@ TEST_F(TestDeltaMemStore, TestReUpdateSlice) {
 
   // Update the same cell again with a different value
   {
-    ScopedTransaction tx(&mvcc_, clock_->Now());
+    ScopedTransaction tx(&mvcc_, clock_.Now());
     tx.StartApplying();
     char buf[256] = "update 2";
     Slice s(buf);
@@ -441,8 +440,8 @@ TEST_F(TestDeltaMemStore, TestOutOfOrderTxns) {
   RowChangeListEncoder update(&update_buf);
 
   {
-    ScopedTransaction tx1(&mvcc_, clock_->Now());
-    ScopedTransaction tx2(&mvcc_, clock_->Now());
+    ScopedTransaction tx1(&mvcc_, clock_.Now());
+    ScopedTransaction tx2(&mvcc_, clock_.Now());
 
     tx2.StartApplying();
     Slice s("update 2");
@@ -476,7 +475,7 @@ TEST_F(TestDeltaMemStore, TestDMSBasic) {
 
   char buf[256];
   for (uint32_t i = 0; i < 1000; i++) {
-    ScopedTransaction tx(&mvcc_, clock_->Now());
+    ScopedTransaction tx(&mvcc_, clock_.Now());
     tx.StartApplying();
     update.Reset();
 
@@ -520,7 +519,7 @@ TEST_F(TestDeltaMemStore, TestDMSBasic) {
   // these are separate transactions and we need to maintain the
   // old ones for snapshot consistency purposes.
   for (uint32_t i = 0; i < 1000; i++) {
-    ScopedTransaction tx(&mvcc_, clock_->Now());
+    ScopedTransaction tx(&mvcc_, clock_.Now());
     tx.StartApplying();
     update.Reset();
 
diff --git a/src/kudu/tablet/diskrowset-test-base.h 
b/src/kudu/tablet/diskrowset-test-base.h
index 0c7190e..8b3dfbc 100644
--- a/src/kudu/tablet/diskrowset-test-base.h
+++ b/src/kudu/tablet/diskrowset-test-base.h
@@ -55,11 +55,11 @@ namespace tablet {
 class TestRowSet : public KuduRowSetTest {
  public:
   TestRowSet()
-    : KuduRowSetTest(CreateTestSchema()),
-      n_rows_(FLAGS_roundtrip_num_rows),
-      op_id_(consensus::MaximumOpId()),
-      
clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)),
-      log_anchor_registry_(new log::LogAnchorRegistry()) {
+      : KuduRowSetTest(CreateTestSchema()),
+        n_rows_(FLAGS_roundtrip_num_rows),
+        op_id_(consensus::MaximumOpId()),
+        clock_(Timestamp::kInitialTimestamp),
+        log_anchor_registry_(new log::LogAnchorRegistry()) {
     CHECK_GT(n_rows_, 0);
   }
 
@@ -185,7 +185,7 @@ class TestRowSet : public KuduRowSetTest {
     RowSetKeyProbe probe(rb.row());
 
     ProbeStats stats;
-    ScopedTransaction tx(&mvcc_, clock_->Now());
+    ScopedTransaction tx(&mvcc_, clock_.Now());
     tx.StartApplying();
     Status s = rs->MutateRow(tx.timestamp(), probe, mutation, op_id_, nullptr, 
&stats, result);
     tx.Commit();
@@ -339,7 +339,7 @@ class TestRowSet : public KuduRowSetTest {
 
   size_t n_rows_;
   consensus::OpId op_id_; // Generally a "fake" OpId for these tests.
-  std::unique_ptr<clock::Clock> clock_;
+  clock::LogicalClock clock_;
   MvccManager mvcc_;
   scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
 };
diff --git a/src/kudu/tablet/diskrowset-test.cc 
b/src/kudu/tablet/diskrowset-test.cc
index 3437f7f..bc6f3f7 100644
--- a/src/kudu/tablet/diskrowset-test.cc
+++ b/src/kudu/tablet/diskrowset-test.cc
@@ -33,7 +33,6 @@
 #include <glog/stl_logging.h>
 #include <gtest/gtest.h>
 
-#include "kudu/clock/clock.h"
 #include "kudu/clock/logical_clock.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/iterator.h"
@@ -412,7 +411,7 @@ TEST_F(TestRowSet, TestFlushedUpdatesRespectMVCC) {
   RowChangeListEncoder update(&update_buf);
   for (uint32_t i = 2; i <= 5; i++) {
     {
-      ScopedTransaction tx(&mvcc_, clock_->Now());
+      ScopedTransaction tx(&mvcc_, clock_.Now());
       tx.StartApplying();
       update.Reset();
       update.AddColumnUpdate(schema_.column(1), schema_.column_id(1), &i);
@@ -658,7 +657,7 @@ TEST_F(TestRowSet, TestGCAncientStores) {
   // Delete all the UNDO deltas. There shouldn't be any delta stores left.
   int64_t blocks_deleted;
   int64_t bytes_deleted;
-  ASSERT_OK(dt->DeleteAncientUndoDeltas(clock_->Now(), nullptr, 
&blocks_deleted, &bytes_deleted));
+  ASSERT_OK(dt->DeleteAncientUndoDeltas(clock_.Now(), nullptr, 
&blocks_deleted, &bytes_deleted));
   ASSERT_GT(blocks_deleted, 0);
   ASSERT_GT(bytes_deleted, 0);
   ASSERT_EQ(0, dt->CountUndoDeltaStores());
@@ -710,7 +709,7 @@ class DiffScanRowSetTest : public KuduRowSetTest,
   DiffScanRowSetTest()
       : KuduRowSetTest(CreateTestSchema()),
         op_id_(consensus::MaximumOpId()),
-        
clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)) {
+        clock_(Timestamp::kInitialTimestamp) {
   }
 
  protected:
@@ -723,7 +722,7 @@ class DiffScanRowSetTest : public KuduRowSetTest,
   }
 
   consensus::OpId op_id_;
-  unique_ptr<clock::Clock> clock_;
+  clock::LogicalClock clock_;
   MvccManager mvcc_;
 };
 
@@ -850,7 +849,7 @@ TEST_P(DiffScanRowSetTest, TestFuzz) {
     RowSetKeyProbe probe(rb.row());
 
     // Apply the mutation.
-    ScopedTransaction tx(&mvcc_, clock_->Now());
+    ScopedTransaction tx(&mvcc_, clock_.Now());
     tx.StartApplying();
     ProbeStats stats;
     OperationResultPB result;
diff --git a/src/kudu/tablet/memrowset-test.cc 
b/src/kudu/tablet/memrowset-test.cc
index f3a5c39..773a543 100644
--- a/src/kudu/tablet/memrowset-test.cc
+++ b/src/kudu/tablet/memrowset-test.cc
@@ -32,7 +32,6 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-#include "kudu/clock/clock.h"
 #include "kudu/clock/logical_clock.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/row.h"
@@ -87,7 +86,7 @@ class TestMemRowSet : public KuduTest {
       log_anchor_registry_(new LogAnchorRegistry()),
       schema_(CreateSchema()),
       key_schema_(schema_.CreateKeyProjection()),
-      
clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)) {
+      clock_(Timestamp::kInitialTimestamp) {
   }
 
   static Schema CreateSchema() {
@@ -148,7 +147,7 @@ class TestMemRowSet : public KuduTest {
   }
 
   Status InsertRow(MemRowSet *mrs, const string &key, uint32_t val) {
-    ScopedTransaction tx(&mvcc_, clock_->Now());
+    ScopedTransaction tx(&mvcc_, clock_.Now());
     RowBuilder rb(&schema_);
     rb.AddString(key);
     rb.AddUint32(val);
@@ -162,7 +161,7 @@ class TestMemRowSet : public KuduTest {
                    const string &key,
                    uint32_t new_val,
                    OperationResultPB* result) {
-    ScopedTransaction tx(&mvcc_, clock_->Now());
+    ScopedTransaction tx(&mvcc_, clock_.Now());
     tx.StartApplying();
 
     mutation_buf_.clear();
@@ -185,7 +184,7 @@ class TestMemRowSet : public KuduTest {
   }
 
   Status DeleteRow(MemRowSet *mrs, const string &key, OperationResultPB* 
result) {
-    ScopedTransaction tx(&mvcc_, clock_->Now());
+    ScopedTransaction tx(&mvcc_, clock_.Now());
     tx.StartApplying();
 
     mutation_buf_.clear();
@@ -261,7 +260,7 @@ class TestMemRowSet : public KuduTest {
   faststring mutation_buf_;
   const Schema schema_;
   const Schema key_schema_;
-  unique_ptr<clock::Clock> clock_;
+  clock::LogicalClock clock_;
   MvccManager mvcc_;
 };
 
@@ -309,7 +308,7 @@ TEST_F(TestMemRowSet, TestInsertAndIterateCompoundKey) {
 
   RowBuilder rb(&compound_key_schema);
   {
-    ScopedTransaction tx(&mvcc_, clock_->Now());
+    ScopedTransaction tx(&mvcc_, clock_.Now());
     tx.StartApplying();
     rb.AddString(string("hello world"));
     rb.AddInt32(1);
@@ -320,7 +319,7 @@ TEST_F(TestMemRowSet, TestInsertAndIterateCompoundKey) {
   }
 
   {
-    ScopedTransaction tx2(&mvcc_, clock_->Now());
+    ScopedTransaction tx2(&mvcc_, clock_.Now());
     tx2.StartApplying();
     rb.Reset();
     rb.AddString(string("goodbye world"));
@@ -332,7 +331,7 @@ TEST_F(TestMemRowSet, TestInsertAndIterateCompoundKey) {
   }
 
   {
-    ScopedTransaction tx3(&mvcc_, clock_->Now());
+    ScopedTransaction tx3(&mvcc_, clock_.Now());
     tx3.StartApplying();
     rb.Reset();
     rb.AddString(string("goodbye world"));
@@ -549,7 +548,7 @@ TEST_F(TestMemRowSet, TestInsertionMVCC) {
   // Insert 5 rows in tx 0 through 4
   for (uint32_t i = 0; i < 5; i++) {
     {
-      ScopedTransaction tx(&mvcc_, clock_->Now());
+      ScopedTransaction tx(&mvcc_, clock_.Now());
       tx.StartApplying();
       RowBuilder rb(&schema_);
       char keybuf[256];
diff --git a/src/kudu/tablet/mvcc-test.cc b/src/kudu/tablet/mvcc-test.cc
index d7cf1a5..0b42177 100644
--- a/src/kudu/tablet/mvcc-test.cc
+++ b/src/kudu/tablet/mvcc-test.cc
@@ -27,11 +27,11 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-#include "kudu/clock/clock.h"
 #include "kudu/clock/hybrid_clock.h"
 #include "kudu/clock/logical_clock.h"
 #include "kudu/common/timestamp.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -40,16 +40,15 @@
 using std::thread;
 using std::unique_ptr;
 
+METRIC_DECLARE_entity(server);
+
 namespace kudu {
 namespace tablet {
 
-using clock::Clock;
-using clock::HybridClock;
-
 class MvccTest : public KuduTest {
  public:
   MvccTest()
-      : 
clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)) {
+      : clock_(Timestamp::kInitialTimestamp) {
   }
 
   void WaitForSnapshotAtTSThread(MvccManager* mgr, Timestamp ts) {
@@ -66,9 +65,8 @@ class MvccTest : public KuduTest {
   }
 
  protected:
-  unique_ptr<clock::Clock> clock_;
-
-  mutable simple_spinlock lock_;
+  clock::LogicalClock clock_;
+  simple_spinlock lock_;
   unique_ptr<MvccSnapshot> result_snapshot_;
 };
 
@@ -83,7 +81,7 @@ TEST_F(MvccTest, TestMvccBasic) {
   ASSERT_FALSE(snap.IsCommitted(Timestamp(2)));
 
   // Start timestamp 1
-  Timestamp t = clock_->Now();
+  Timestamp t = clock_.Now();
   ASSERT_EQ(1, t.value());
   mgr.StartTransaction(t);
 
@@ -113,10 +111,10 @@ TEST_F(MvccTest, TestMvccMultipleInFlight) {
   MvccManager mgr;
   MvccSnapshot snap;
 
-  Timestamp t1 = clock_->Now();
+  Timestamp t1 = clock_.Now();
   ASSERT_EQ(1, t1.value());
   mgr.StartTransaction(t1);
-  Timestamp t2 = clock_->Now();
+  Timestamp t2 = clock_.Now();
   ASSERT_EQ(2, t2.value());
   mgr.StartTransaction(t2);
 
@@ -140,7 +138,7 @@ TEST_F(MvccTest, TestMvccMultipleInFlight) {
   ASSERT_TRUE(snap.IsCommitted(t2));
 
   // Start another transaction. This gets timestamp 3
-  Timestamp t3 = clock_->Now();
+  Timestamp t3 = clock_.Now();
   ASSERT_EQ(3, t3.value());
   mgr.StartTransaction(t3);
 
@@ -182,18 +180,20 @@ TEST_F(MvccTest, TestMvccMultipleInFlight) {
 }
 
 TEST_F(MvccTest, TestOutOfOrderTxns) {
-  unique_ptr<Clock> hybrid_clock(new HybridClock);
-  ASSERT_OK(hybrid_clock->Init());
+  MetricRegistry metric_registry;
+  auto metric_entity(METRIC_ENTITY_server.Instantiate(&metric_registry, 
"mvcc-test"));
+  clock::HybridClock hybrid_clock(metric_entity);
+  ASSERT_OK(hybrid_clock.Init());
   MvccManager mgr;
 
   // Start a normal non-commit-wait txn.
-  Timestamp normal_txn = hybrid_clock->Now();
+  Timestamp normal_txn = hybrid_clock.Now();
   mgr.StartTransaction(normal_txn);
 
   MvccSnapshot s1(mgr);
 
   // Start a transaction as if it were using commit-wait (i.e. started in 
future)
-  Timestamp cw_txn = hybrid_clock->NowLatest();
+  Timestamp cw_txn = hybrid_clock.NowLatest();
   mgr.StartTransaction(cw_txn);
 
   // Commit the original txn
@@ -201,7 +201,7 @@ TEST_F(MvccTest, TestOutOfOrderTxns) {
   mgr.CommitTransaction(normal_txn);
 
   // Start a new txn
-  Timestamp normal_txn_2 = hybrid_clock->Now();
+  Timestamp normal_txn_2 = hybrid_clock.Now();
   mgr.StartTransaction(normal_txn_2);
 
   // The old snapshot should not have either txn
@@ -214,7 +214,7 @@ TEST_F(MvccTest, TestOutOfOrderTxns) {
   EXPECT_FALSE(s2.IsCommitted(normal_txn_2));
 
   // Commit the commit-wait one once it is time.
-  ASSERT_OK(hybrid_clock->WaitUntilAfter(cw_txn, MonoTime::Max()));
+  ASSERT_OK(hybrid_clock.WaitUntilAfter(cw_txn, MonoTime::Max()));
   mgr.StartApplyingTransaction(cw_txn);
   mgr.CommitTransaction(cw_txn);
 
@@ -229,7 +229,7 @@ TEST_F(MvccTest, TestSafeTimeWithOutOfOrderTxns) {
   MvccManager mgr;
 
   // Set the clock to some time in the "future".
-  ASSERT_OK(clock_->Update(Timestamp(100)));
+  ASSERT_OK(clock_.Update(Timestamp(100)));
 
   // Start a transaction in the "past"
   Timestamp txn_in_the_past(50);
@@ -266,8 +266,8 @@ TEST_F(MvccTest, TestScopedTransaction) {
   MvccSnapshot snap;
 
   {
-    ScopedTransaction t1(&mgr, clock_->Now());
-    ScopedTransaction t2(&mgr, clock_->Now());
+    ScopedTransaction t1(&mgr, clock_.Now());
+    ScopedTransaction t2(&mgr, clock_.Now());
 
     ASSERT_EQ(1, t1.timestamp().value());
     ASSERT_EQ(2, t2.timestamp().value());
@@ -289,7 +289,7 @@ TEST_F(MvccTest, TestScopedTransaction) {
   // scope while the MvccManager is closed.
   mgr.Close();
   {
-    ScopedTransaction t(&mgr, clock_->Now());
+    ScopedTransaction t(&mgr, clock_.Now());
     NO_FATALS(t.StartApplying());
   }
 }
@@ -391,13 +391,13 @@ TEST_F(MvccTest, TestAreAllTransactionsCommitted) {
   MvccManager mgr;
 
   // start several transactions and take snapshots along the way
-  Timestamp tx1 = clock_->Now();
+  Timestamp tx1 = clock_.Now();
   mgr.StartTransaction(tx1);
-  Timestamp tx2 = clock_->Now();
+  Timestamp tx2 = clock_.Now();
   mgr.StartTransaction(tx2);
-  Timestamp tx3 = clock_->Now();
+  Timestamp tx3 = clock_.Now();
   mgr.StartTransaction(tx3);
-  mgr.AdjustSafeTime(clock_->Now());
+  mgr.AdjustSafeTime(clock_.Now());
 
   ASSERT_FALSE(mgr.AreAllTransactionsCommitted(Timestamp(1)));
   ASSERT_FALSE(mgr.AreAllTransactionsCommitted(Timestamp(2)));
@@ -427,10 +427,10 @@ TEST_F(MvccTest, TestAreAllTransactionsCommitted) {
   ASSERT_TRUE(mgr.AreAllTransactionsCommitted(Timestamp(3)));
 }
 
-TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapWithNoInflights) {
+TEST_F(MvccTest, WaitForCleanSnapshotSnapWithNoInflights) {
   MvccManager mgr;
-  Timestamp to_wait_for = clock_->Now();
-  mgr.AdjustSafeTime(clock_->Now());
+  Timestamp to_wait_for = clock_.Now();
+  mgr.AdjustSafeTime(clock_.Now());
   thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this, 
&mgr, to_wait_for);
 
   // join immediately.
@@ -438,19 +438,19 @@ TEST_F(MvccTest, 
TestWaitForCleanSnapshot_SnapWithNoInflights) {
   ASSERT_TRUE(HasResultSnapshot());
 }
 
-TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapBeforeSafeTimeWithInFlights) {
+TEST_F(MvccTest, WaitForCleanSnapshotSnapBeforeSafeTimeWithInFlights) {
   MvccManager mgr;
 
-  Timestamp tx1 = clock_->Now();
+  Timestamp tx1 = clock_.Now();
   mgr.StartTransaction(tx1);
-  Timestamp tx2 = clock_->Now();
+  Timestamp tx2 = clock_.Now();
   mgr.StartTransaction(tx2);
   mgr.AdjustSafeTime(tx2);
-  Timestamp to_wait_for = clock_->Now();
+  Timestamp to_wait_for = clock_.Now();
 
   // Select a safe time that is after all transactions and after the the 
timestamp we'll wait for
   // and adjust it on the MvccManager. This will cause "clean time" to move 
when tx1 and tx2 commit.
-  Timestamp safe_time = clock_->Now();
+  Timestamp safe_time = clock_.Now();
   mgr.AdjustSafeTime(safe_time);
 
   thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this, 
&mgr, to_wait_for);
@@ -465,11 +465,11 @@ TEST_F(MvccTest, 
TestWaitForCleanSnapshot_SnapBeforeSafeTimeWithInFlights) {
   ASSERT_TRUE(HasResultSnapshot());
 }
 
-TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapAfterSafeTimeWithInFlights) {
+TEST_F(MvccTest, WaitForCleanSnapshotSnapAfterSafeTimeWithInFlights) {
   MvccManager mgr;
-  Timestamp tx1 = clock_->Now();
+  Timestamp tx1 = clock_.Now();
   mgr.StartTransaction(tx1);
-  Timestamp tx2 = clock_->Now();
+  Timestamp tx2 = clock_.Now();
   mgr.StartTransaction(tx2);
   mgr.AdjustSafeTime(tx2);
 
@@ -499,15 +499,15 @@ TEST_F(MvccTest, 
TestWaitForCleanSnapshot_SnapAfterSafeTimeWithInFlights) {
   ASSERT_OK(s);
 }
 
-TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapAtTimestampWithInFlights) {
+TEST_F(MvccTest, WaitForCleanSnapshotSnapAtTimestampWithInFlights) {
   MvccManager mgr;
 
   // Transactions with timestamp 1 through 3
-  Timestamp tx1 = clock_->Now();
+  Timestamp tx1 = clock_.Now();
   mgr.StartTransaction(tx1);
-  Timestamp tx2 = clock_->Now();
+  Timestamp tx2 = clock_.Now();
   mgr.StartTransaction(tx2);
-  Timestamp tx3 = clock_->Now();
+  Timestamp tx3 = clock_.Now();
   mgr.StartTransaction(tx3);
 
   // Start a thread waiting for transactions with ts <= 2 to commit
@@ -540,9 +540,9 @@ TEST_F(MvccTest, 
TestWaitForCleanSnapshot_SnapAtTimestampWithInFlights) {
 TEST_F(MvccTest, TestWaitForApplyingTransactionsToCommit) {
   MvccManager mgr;
 
-  Timestamp tx1 = clock_->Now();
+  Timestamp tx1 = clock_.Now();
   mgr.StartTransaction(tx1);
-  Timestamp tx2 = clock_->Now();
+  Timestamp tx2 = clock_.Now();
   mgr.StartTransaction(tx2);
   mgr.AdjustSafeTime(tx2);
 
@@ -572,7 +572,7 @@ TEST_F(MvccTest, TestWaitForApplyingTransactionsToCommit) {
 // instead return an error.
 TEST_F(MvccTest, TestDontWaitAfterClose) {
   MvccManager mgr;
-  Timestamp tx1 = clock_->Now();
+  Timestamp tx1 = clock_.Now();
   mgr.StartTransaction(tx1);
   mgr.AdjustSafeTime(tx1);
   mgr.StartApplyingTransaction(tx1);
@@ -612,11 +612,11 @@ TEST_F(MvccTest, TestTxnAbort) {
   MvccManager mgr;
 
   // Transactions with timestamps 1 through 3
-  Timestamp tx1 = clock_->Now();
+  Timestamp tx1 = clock_.Now();
   mgr.StartTransaction(tx1);
-  Timestamp tx2 = clock_->Now();
+  Timestamp tx2 = clock_.Now();
   mgr.StartTransaction(tx2);
-  Timestamp tx3 = clock_->Now();
+  Timestamp tx3 = clock_.Now();
   mgr.StartTransaction(tx3);
   mgr.AdjustSafeTime(tx3);
 
@@ -645,7 +645,7 @@ TEST_F(MvccTest, TestTxnAbort) {
 TEST_F(MvccTest, TestAutomaticCleanTimeMoveToSafeTimeOnCommit) {
 
   MvccManager mgr;
-  clock_->Update(Timestamp(20));
+  clock_.Update(Timestamp(20));
 
   mgr.StartTransaction(Timestamp(10));
   mgr.StartTransaction(Timestamp(15));
@@ -683,7 +683,7 @@ TEST_F(MvccTest, TestIllegalStateTransitionsCrash) {
     "Trying to commit a transaction with a future timestamp|"
     "Trying to remove timestamp which isn't in the in-flight set: 1");
 
-  clock_->Update(Timestamp(20));
+  clock_.Update(Timestamp(20));
 
   EXPECT_DEATH({
       mgr.CommitTransaction(Timestamp(1));
@@ -691,7 +691,7 @@ TEST_F(MvccTest, TestIllegalStateTransitionsCrash) {
 
   // Start a transaction, and try committing it without having moved to 
"Applying"
   // state.
-  Timestamp t = clock_->Now();
+  Timestamp t = clock_.Now();
   mgr.StartTransaction(t);
   EXPECT_DEATH({
       mgr.CommitTransaction(t);
@@ -706,7 +706,7 @@ TEST_F(MvccTest, TestIllegalStateTransitionsCrash) {
     }, "Trying to remove timestamp which isn't in the in-flight set: 21");
 
   // Start a new transaction. This time, mark it as Applying.
-  t = clock_->Now();
+  t = clock_.Now();
   mgr.StartTransaction(t);
   mgr.AdjustSafeTime(t);
   mgr.StartApplyingTransaction(t);
@@ -729,7 +729,7 @@ TEST_F(MvccTest, TestWaitUntilCleanDeadline) {
   MvccManager mgr;
 
   // Transactions with timestamp 1
-  Timestamp tx1 = clock_->Now();
+  Timestamp tx1 = clock_.Now();
   mgr.StartTransaction(tx1);
 
   // Wait until the 'tx1' timestamp is clean -- this won't happen because the
@@ -758,10 +758,12 @@ TEST_F(MvccTest, TestCorrectInitWithNoTxns) {
   EXPECT_EQ(snap.committed_timestamps_.size(), 0);
 
   // Read the clock a few times to advance the timestamp
-  for (int i = 0; i < 10; i++) clock_->Now();
+  for (int i = 0; i < 10; i++) {
+    clock_.Now();
+  }
 
   // Advance the safe timestamp.
-  Timestamp new_safe_time = clock_->Now();
+  Timestamp new_safe_time = clock_.Now();
   mgr.AdjustSafeTime(new_safe_time);
 
   // Test that the snapshot reports that a timestamp lower than the safe time
diff --git a/src/kudu/tablet/mvcc.h b/src/kudu/tablet/mvcc.h
index 62ccf13..65336d4 100644
--- a/src/kudu/tablet/mvcc.h
+++ b/src/kudu/tablet/mvcc.h
@@ -297,7 +297,7 @@ class MvccManager {
   FRIEND_TEST(MvccTest, TestTxnAbort);
   FRIEND_TEST(MvccTest, TestAutomaticCleanTimeMoveToSafeTimeOnCommit);
   FRIEND_TEST(MvccTest, TestWaitForApplyingTransactionsToCommit);
-  FRIEND_TEST(MvccTest, 
TestWaitForCleanSnapshot_SnapAfterSafeTimeWithInFlights);
+  FRIEND_TEST(MvccTest, WaitForCleanSnapshotSnapAfterSafeTimeWithInFlights);
   FRIEND_TEST(MvccTest, TestDontWaitAfterClose);
 
   enum TxnState {
diff --git a/src/kudu/tablet/tablet-harness.h b/src/kudu/tablet/tablet-harness.h
index 2a60cdd..c3197e9 100644
--- a/src/kudu/tablet/tablet-harness.h
+++ b/src/kudu/tablet/tablet-harness.h
@@ -34,6 +34,8 @@
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
 
+METRIC_DECLARE_entity(server);
+
 namespace kudu {
 namespace tablet {
 
@@ -67,13 +69,11 @@ class TabletHarness {
         : env(Env::Default()),
           tablet_id("test_tablet_id"),
           root_dir(std::move(root_dir)),
-          enable_metrics(true),
           clock_type(LOGICAL_CLOCK) {}
 
     Env* env;
     std::string tablet_id;
     std::string root_dir;
-    bool enable_metrics;
     ClockType clock_type;
   };
 
@@ -103,21 +103,25 @@ class TabletHarness {
                                                /*extra_config=*/ boost::none,
                                                /*dimension_label=*/ 
boost::none,
                                                &metadata));
-    if (options_.enable_metrics) {
-      metrics_registry_.reset(new MetricRegistry());
-    }
-
-    if (options_.clock_type == Options::LOGICAL_CLOCK) {
-      clock_ = 
clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp);
-    } else {
-      clock_.reset(new clock::HybridClock());
-      RETURN_NOT_OK(clock_->Init());
+    metrics_registry_.reset(new MetricRegistry);
+    metric_entity_ = METRIC_ENTITY_server.Instantiate(metrics_registry_.get(),
+                                                      "tablet-harness");
+
+    switch (options_.clock_type) {
+      case Options::HYBRID_CLOCK:
+        clock_.reset(new clock::HybridClock(metric_entity_));
+        break;
+      case Options::LOGICAL_CLOCK:
+        clock_.reset(new clock::LogicalClock(Timestamp::kInitialTimestamp,
+                                             metric_entity_));
+        break;
     }
+    RETURN_NOT_OK(clock_->Init());
     tablet_.reset(new Tablet(metadata,
                              clock_.get(),
-                             std::shared_ptr<MemTracker>(),
+                             {},
                              metrics_registry_.get(),
-                             make_scoped_refptr(new 
log::LogAnchorRegistry())));
+                             make_scoped_refptr(new log::LogAnchorRegistry)));
     return Status::OK();
   }
 
@@ -149,7 +153,8 @@ class TabletHarness {
  private:
   Options options_;
 
-  gscoped_ptr<MetricRegistry> metrics_registry_;
+  std::unique_ptr<MetricRegistry> metrics_registry_;
+  scoped_refptr<MetricEntity> metric_entity_;
 
   std::unique_ptr<clock::Clock> clock_;
   Schema schema_;
diff --git a/src/kudu/tablet/tablet-test-util.h 
b/src/kudu/tablet/tablet-test-util.h
index 3198b3d..5e0bcc9 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -111,7 +111,6 @@ class KuduTabletTest : public KuduTest {
   void CreateTestTablet(const std::string& root_dir = "") {
     std::string dir = root_dir.empty() ? GetTestPath("fs_root") : root_dir;
     TabletHarness::Options opts(dir);
-    opts.enable_metrics = true;
     opts.clock_type = clock_type_;
     bool first_time = harness_ == nullptr;
     harness_.reset(new TabletHarness(schema_, opts));
diff --git a/src/kudu/tools/tool_action_perf.cc 
b/src/kudu/tools/tool_action_perf.cc
index c961c8a..a547a26 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -186,7 +186,6 @@
 #include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h"
 #include "kudu/client/write_op.h"
-#include "kudu/clock/clock.h"
 #include "kudu/clock/logical_clock.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/iterator.h"
@@ -241,7 +240,6 @@ using kudu::client::KuduSchemaBuilder;
 using kudu::client::KuduSession;
 using kudu::client::KuduTable;
 using kudu::client::KuduTableCreator;
-using kudu::clock::Clock;
 using kudu::clock::LogicalClock;
 using kudu::consensus::ConsensusBootstrapInfo;
 using kudu::consensus::ConsensusMetadata;
@@ -814,8 +812,8 @@ Status TabletScan(const RunnerContext& context) {
   scoped_refptr<ConsensusMetadata> cmeta;
   RETURN_NOT_OK(cmeta_manager->Load(tablet_id, &cmeta));
 
-  unique_ptr<Clock> 
clock(LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp));
-  RETURN_NOT_OK(clock->Init());
+  LogicalClock clock(Timestamp::kInitialTimestamp);
+  RETURN_NOT_OK(clock.Init());
 
   scoped_refptr<LogAnchorRegistry> registry(new LogAnchorRegistry());
 
@@ -825,7 +823,7 @@ Status TabletScan(const RunnerContext& context) {
   ConsensusBootstrapInfo cbi;
   RETURN_NOT_OK(tablet::BootstrapTablet(std::move(tmeta),
                                         cmeta->CommittedConfig(),
-                                        clock.get(),
+                                        &clock,
                                         /*mem_tracker=*/ nullptr,
                                         /*result_tracker=*/ nullptr,
                                         /*metric_registry=*/ nullptr,

Reply via email to