This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 8808b04 [clock] update on Clock interface
8808b04 is described below
commit 8808b041c9db0af7642311390d7d9189032cc36a
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Feb 12 19:18:03 2020 -0800
[clock] update on Clock interface
This patch re-factors Clock-related classes:
* removed Clock::RegisterMetrics() method
* HybridClock constructor requires metric entity
* LogicalClock constructor accepts metric entity as optional
second parameter
* LogicalClock constructor is now public
* LogicalClock::CreateStartingAt() is gone
I also did other minor re-factoring, partially due to warnings reported
by ClangTidy on the code I touched.
The motivation for this change is to prepare for follow-up changelists
addressing KUDU-3048 (adding clock metrics for better observability).
Change-Id: Ic4c1944d54bf50e54c06c12e2fb9e57fc452b877
Reviewed-on: http://gerrit.cloudera.org:8080/15215
Tested-by: Kudu Jenkins
Reviewed-by: Volodymyr Verovkin <[email protected]>
Reviewed-by: Adar Dembo <[email protected]>
---
src/kudu/clock/clock.h | 3 -
src/kudu/clock/hybrid_clock-test.cc | 147 ++++++++++++-----------
src/kudu/clock/hybrid_clock.cc | 24 ++--
src/kudu/clock/hybrid_clock.h | 12 +-
src/kudu/clock/logical_clock-test.cc | 44 +++----
src/kudu/clock/logical_clock.cc | 23 ++--
src/kudu/clock/logical_clock.h | 12 +-
src/kudu/consensus/consensus_peers-test.cc | 29 +++--
src/kudu/consensus/consensus_queue-test.cc | 14 ++-
src/kudu/consensus/log-test-base.h | 4 +-
src/kudu/consensus/log_cache-test.cc | 15 ++-
src/kudu/consensus/raft_consensus_quorum-test.cc | 15 ++-
src/kudu/consensus/time_manager-test.cc | 36 ++++--
src/kudu/integration-tests/ts_recovery-itest.cc | 11 +-
src/kudu/server/server_base.cc | 6 +-
src/kudu/tablet/compaction-test.cc | 29 ++---
src/kudu/tablet/deltamemstore-test.cc | 29 +++--
src/kudu/tablet/diskrowset-test-base.h | 14 +--
src/kudu/tablet/diskrowset-test.cc | 11 +-
src/kudu/tablet/memrowset-test.cc | 19 ++-
src/kudu/tablet/mvcc-test.cc | 110 ++++++++---------
src/kudu/tablet/mvcc.h | 2 +-
src/kudu/tablet/tablet-harness.h | 33 ++---
src/kudu/tablet/tablet-test-util.h | 1 -
src/kudu/tools/tool_action_perf.cc | 8 +-
25 files changed, 336 insertions(+), 315 deletions(-)
diff --git a/src/kudu/clock/clock.h b/src/kudu/clock/clock.h
index 79ac84e..020679c 100644
--- a/src/kudu/clock/clock.h
+++ b/src/kudu/clock/clock.h
@@ -103,9 +103,6 @@ class Clock {
// to Now() would return a higher value than t).
virtual bool IsAfter(Timestamp t) = 0;
- // Register the clock metrics in the given entity.
- virtual void RegisterMetrics(const scoped_refptr<MetricEntity>&
metric_entity) = 0;
-
// Strigifies the provided timestamp according to this clock's internal
format.
virtual std::string Stringify(Timestamp timestamp) = 0;
};
diff --git a/src/kudu/clock/hybrid_clock-test.cc
b/src/kudu/clock/hybrid_clock-test.cc
index 422daf4..378ec78 100644
--- a/src/kudu/clock/hybrid_clock-test.cc
+++ b/src/kudu/clock/hybrid_clock-test.cc
@@ -19,7 +19,6 @@
#include <algorithm>
#include <cstdint>
-#include <memory>
#include <string>
#include <vector>
@@ -34,6 +33,7 @@
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/util/atomic.h"
+#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/random.h"
#include "kudu/util/random_util.h"
@@ -43,60 +43,59 @@
#include "kudu/util/test_util.h"
#include "kudu/util/thread.h"
-
DECLARE_bool(inject_unsync_time_errors);
DECLARE_string(time_source);
+METRIC_DECLARE_entity(server);
using std::string;
-using std::unique_ptr;
using std::vector;
namespace kudu {
namespace clock {
-class HybridClockTest : public KuduTest {
+class ClockTest : public KuduTest {
public:
- HybridClockTest()
- : clock_(new HybridClock) {
- }
-
- void SetUp() override {
- KuduTest::SetUp();
- ASSERT_OK(clock_->Init());
+ ClockTest()
+ : metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
+ "clock-test")) {
}
protected:
- unique_ptr<HybridClock> clock_;
+ MetricRegistry metric_registry_;
+ scoped_refptr<MetricEntity> metric_entity_;
+};
+
+class MockHybridClockTest : public ClockTest {
};
-clock::MockNtp* mock_ntp(HybridClock* clock) {
- return down_cast<clock::MockNtp*>(clock->time_service());
+clock::MockNtp* mock_ntp(const HybridClock& clock) {
+ return down_cast<clock::MockNtp*>(clock.time_service());
}
-TEST(MockHybridClockTest, TestMockedSystemClock) {
+TEST_F(MockHybridClockTest, TestMockedSystemClock) {
google::FlagSaver saver;
FLAGS_time_source = "mock";
- unique_ptr<HybridClock> clock(new HybridClock);
- ASSERT_OK(clock->Init());
+ HybridClock clock(metric_entity_);
+ ASSERT_OK(clock.Init());
Timestamp timestamp;
uint64_t max_error_usec;
- clock->NowWithError(×tamp, &max_error_usec);
+ clock.NowWithError(×tamp, &max_error_usec);
ASSERT_EQ(timestamp.ToUint64(), 0);
ASSERT_EQ(max_error_usec, 0);
// If we read the clock again we should see the logical component be
incremented.
- clock->NowWithError(×tamp, &max_error_usec);
+ clock.NowWithError(×tamp, &max_error_usec);
ASSERT_EQ(timestamp.ToUint64(), 1);
// Now set an arbitrary time and check that is the time returned by the
clock.
uint64_t time = 1234 * 1000;
uint64_t error = 100 * 1000;
- mock_ntp(clock.get())->SetMockClockWallTimeForTests(time);
- mock_ntp(clock.get())->SetMockMaxClockErrorForTests(error);
- clock->NowWithError(×tamp, &max_error_usec);
+ mock_ntp(clock)->SetMockClockWallTimeForTests(time);
+ mock_ntp(clock)->SetMockMaxClockErrorForTests(error);
+ clock.NowWithError(×tamp, &max_error_usec);
ASSERT_EQ(timestamp.ToUint64(),
HybridClock::TimestampFromMicrosecondsAndLogicalValue(time,
0).ToUint64());
ASSERT_EQ(max_error_usec, error);
// Perform another read, we should observe the logical component increment,
again.
- clock->NowWithError(×tamp, &max_error_usec);
+ clock.NowWithError(×tamp, &max_error_usec);
ASSERT_EQ(timestamp.ToUint64(),
HybridClock::TimestampFromMicrosecondsAndLogicalValue(time,
1).ToUint64());
}
@@ -107,22 +106,22 @@ TEST(MockHybridClockTest, TestMockedSystemClock) {
// guarantees even as the physical clock continues to increase.
//
// This is a regression test for KUDU-1345.
-TEST(MockHybridClockTest, TestClockDealsWithWrapping) {
+TEST_F(MockHybridClockTest, TestClockDealsWithWrapping) {
google::FlagSaver saver;
FLAGS_time_source = "mock";
- unique_ptr<HybridClock> clock(new HybridClock);
- ASSERT_OK(clock->Init());
- mock_ntp(clock.get())->SetMockClockWallTimeForTests(1000);
+ HybridClock clock(metric_entity_);
+ ASSERT_OK(clock.Init());
+ mock_ntp(clock)->SetMockClockWallTimeForTests(1000);
- Timestamp prev = clock->Now();
+ Timestamp prev = clock.Now();
// Update the clock from 10us in the future
- ASSERT_OK(clock->Update(HybridClock::TimestampFromMicroseconds(1010)));
+ ASSERT_OK(clock.Update(HybridClock::TimestampFromMicroseconds(1010)));
// Now read the clock value enough times so that the logical value wraps
// over, and should increment the _physical_ portion of the clock.
for (int i = 0; i < 10000; i++) {
- Timestamp now = clock->Now();
+ Timestamp now = clock.Now();
ASSERT_GT(now.value(), prev.value());
prev = now;
}
@@ -131,8 +130,8 @@ TEST(MockHybridClockTest, TestClockDealsWithWrapping) {
// Advance the time microsecond by microsecond, and ensure the clock never
// goes backwards.
for (int time = 1001; time < 1020; time++) {
- mock_ntp(clock.get())->SetMockClockWallTimeForTests(time);
- Timestamp now = clock->Now();
+ mock_ntp(clock)->SetMockClockWallTimeForTests(time);
+ Timestamp now = clock.Now();
// Clock should run strictly forwards.
ASSERT_GT(now.value(), prev.value());
@@ -150,16 +149,31 @@ TEST(MockHybridClockTest, TestClockDealsWithWrapping) {
}
}
+class HybridClockTest : public ClockTest {
+ public:
+ HybridClockTest()
+ : clock_(metric_entity_) {
+ }
+
+ void SetUp() override {
+ KuduTest::SetUp();
+ ASSERT_OK(clock_.Init());
+ }
+
+ protected:
+ HybridClock clock_;
+};
+
// Test that two subsequent time reads are monotonically increasing.
-TEST_F(HybridClockTest, TestNow_ValuesIncreaseMonotonically) {
- const Timestamp now1 = clock_->Now();
- const Timestamp now2 = clock_->Now();
+TEST_F(HybridClockTest, NowValuesIncreaseMonotonically) {
+ const Timestamp now1 = clock_.Now();
+ const Timestamp now2 = clock_.Now();
ASSERT_LT(now1.value(), now2.value());
}
// Tests the clock updates with the incoming value if it is higher.
-TEST_F(HybridClockTest, TestUpdate_LogicalValueIncreasesByAmount) {
- Timestamp now = clock_->Now();
+TEST_F(HybridClockTest, UpdateLogicalValueIncreasesByAmount) {
+ Timestamp now = clock_.Now();
uint64_t now_micros = HybridClock::GetPhysicalValueMicros(now);
// increase the logical value
@@ -170,31 +184,31 @@ TEST_F(HybridClockTest,
TestUpdate_LogicalValueIncreasesByAmount) {
// one, 200 msecs should be more than enough.
now_micros += 200000;
- Timestamp now_increased =
HybridClock::TimestampFromMicrosecondsAndLogicalValue(now_micros,
-
logical);
+ auto now_increased = HybridClock::TimestampFromMicrosecondsAndLogicalValue(
+ now_micros, logical);
- ASSERT_OK(clock_->Update(now_increased));
+ ASSERT_OK(clock_.Update(now_increased));
- Timestamp now2 = clock_->Now();
+ Timestamp now2 = clock_.Now();
ASSERT_EQ(logical + 1, HybridClock::GetLogicalValue(now2));
ASSERT_EQ(HybridClock::GetPhysicalValueMicros(now) + 200000,
HybridClock::GetPhysicalValueMicros(now2));
}
// Test that the incoming event is in the past, i.e. less than now - max_error
-TEST_F(HybridClockTest, TestWaitUntilAfter_TestCase1) {
+TEST_F(HybridClockTest, WaitUntilAfterCase1) {
MonoTime no_deadline;
MonoTime before = MonoTime::Now();
Timestamp past_ts;
uint64_t max_error;
- clock_->NowWithError(&past_ts, &max_error);
+ clock_.NowWithError(&past_ts, &max_error);
// make the event 3 * the max. possible error in the past
Timestamp past_ts_changed = HybridClock::AddPhysicalTimeToTimestamp(
past_ts,
MonoDelta::FromMicroseconds(-3 * static_cast<int64_t>(max_error)));
- ASSERT_OK(clock_->WaitUntilAfter(past_ts_changed, no_deadline));
+ ASSERT_OK(clock_.WaitUntilAfter(past_ts_changed, no_deadline));
MonoTime after = MonoTime::Now();
MonoDelta delta = after - before;
@@ -205,14 +219,14 @@ TEST_F(HybridClockTest, TestWaitUntilAfter_TestCase1) {
// The normal case for transactions. Obtain a timestamp and then wait until
// we're sure that tx_latest < now_earliest.
-TEST_F(HybridClockTest, TestWaitUntilAfter_TestCase2) {
+TEST_F(HybridClockTest, WaitUntilAfterCase2) {
const MonoTime before = MonoTime::Now();
// we do no time adjustment, this event should fall right within the possible
// error interval
Timestamp past_ts;
uint64_t past_max_error;
- clock_->NowWithError(&past_ts, &past_max_error);
+ clock_.NowWithError(&past_ts, &past_max_error);
// Make sure the error is at least a small number of microseconds, to ensure
// that we always have to wait.
past_max_error = std::max(past_max_error, static_cast<uint64_t>(2000));
@@ -222,19 +236,19 @@ TEST_F(HybridClockTest, TestWaitUntilAfter_TestCase2) {
Timestamp current_ts;
uint64_t current_max_error;
- clock_->NowWithError(¤t_ts, ¤t_max_error);
+ clock_.NowWithError(¤t_ts, ¤t_max_error);
// Check waiting with a deadline which already expired.
{
MonoTime deadline = before;
- Status s = clock_->WaitUntilAfter(wait_until, deadline);
+ Status s = clock_.WaitUntilAfter(wait_until, deadline);
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
}
// Wait with a deadline well in the future. This should succeed.
{
MonoTime deadline = before + MonoDelta::FromSeconds(60);
- ASSERT_OK(clock_->WaitUntilAfter(wait_until, deadline));
+ ASSERT_OK(clock_.WaitUntilAfter(wait_until, deadline));
}
MonoTime after = MonoTime::Now();
@@ -251,19 +265,19 @@ TEST_F(HybridClockTest, TestWaitUntilAfter_TestCase2) {
}
TEST_F(HybridClockTest, TestIsAfter) {
- Timestamp ts1 = clock_->Now();
- ASSERT_TRUE(clock_->IsAfter(ts1));
+ Timestamp ts1 = clock_.Now();
+ ASSERT_TRUE(clock_.IsAfter(ts1));
// Update the clock in the future, make sure it still
// handles "IsAfter" properly even when it's running in
// "logical" mode.
Timestamp now_increased = HybridClock::TimestampFromMicroseconds(
HybridClock::GetPhysicalValueMicros(ts1) + 1 * 1000 * 1000);
- ASSERT_OK(clock_->Update(now_increased));
- Timestamp ts2 = clock_->Now();
+ ASSERT_OK(clock_.Update(now_increased));
+ Timestamp ts2 = clock_.Now();
- ASSERT_TRUE(clock_->IsAfter(ts1));
- ASSERT_TRUE(clock_->IsAfter(ts2));
+ ASSERT_TRUE(clock_.IsAfter(ts1));
+ ASSERT_TRUE(clock_.IsAfter(ts2));
}
// Thread which loops polling the clock and updating it slightly
@@ -298,8 +312,7 @@ TEST_F(HybridClockTest,
TestClockDoesntGoBackwardsWithUpdates) {
for (int i = 0; i < 4; i++) {
scoped_refptr<Thread> thread;
- ASSERT_OK(Thread::Create("test", "stresser",
- &StresserThread, clock_.get(), &stop,
+ ASSERT_OK(Thread::Create("test", "stresser", &StresserThread, &clock_,
&stop,
&thread));
threads.push_back(thread);
}
@@ -311,8 +324,8 @@ TEST_F(HybridClockTest, TestGetPhysicalComponentDifference)
{
Timestamp now1 = HybridClock::TimestampFromMicrosecondsAndLogicalValue(100,
100);
SleepFor(MonoDelta::FromMilliseconds(1));
Timestamp now2 = HybridClock::TimestampFromMicrosecondsAndLogicalValue(200,
0);
- MonoDelta delta = clock_->GetPhysicalComponentDifference(now2, now1);
- MonoDelta negative_delta = clock_->GetPhysicalComponentDifference(now1,
now2);
+ MonoDelta delta = clock_.GetPhysicalComponentDifference(now2, now1);
+ MonoDelta negative_delta = clock_.GetPhysicalComponentDifference(now1, now2);
ASSERT_EQ(100, delta.ToMicroseconds());
ASSERT_EQ(-100, negative_delta.ToMicroseconds());
}
@@ -322,31 +335,31 @@ TEST_F(HybridClockTest, TestRideOverNtpInterruption) {
uint64_t max_error_usec[3];
// Get the clock once, with a working NTP.
- clock_->NowWithError(×tamps[0], &max_error_usec[0]);
+ clock_.NowWithError(×tamps[0], &max_error_usec[0]);
// Try to read the clock again a second later, but with an error
// injected. It should extrapolate from the first read.
SleepFor(MonoDelta::FromSeconds(1));
FLAGS_inject_unsync_time_errors = true;
- clock_->NowWithError(×tamps[1], &max_error_usec[1]);
+ clock_.NowWithError(×tamps[1], &max_error_usec[1]);
// The new clock reading should be a second or longer from the
// first one, since SleepFor guarantees sleeping at least as long
// as specified.
- MonoDelta phys_diff = clock_->GetPhysicalComponentDifference(
+ MonoDelta phys_diff = clock_.GetPhysicalComponentDifference(
timestamps[1], timestamps[0]);
ASSERT_GE(phys_diff.ToSeconds(), 1);
// The new clock reading should have higher error than the first.
// The error should have increased based on the clock skew.
int64_t error_diff = max_error_usec[1] - max_error_usec[0];
- ASSERT_NEAR(error_diff, clock_->time_service()->skew_ppm() *
phys_diff.ToSeconds(),
+ ASSERT_NEAR(error_diff, clock_.time_service()->skew_ppm() *
phys_diff.ToSeconds(),
10);
// Now restore the ability to read the system clock, and
// read it again.
FLAGS_inject_unsync_time_errors = false;
- clock_->NowWithError(×tamps[2], &max_error_usec[2]);
+ clock_.NowWithError(×tamps[2], &max_error_usec[2]);
ASSERT_LT(timestamps[0].ToUint64(), timestamps[1].ToUint64());
ASSERT_LT(timestamps[1].ToUint64(), timestamps[2].ToUint64());
@@ -355,11 +368,11 @@ TEST_F(HybridClockTest, TestRideOverNtpInterruption) {
#if defined(KUDU_HAS_SYSTEM_TIME_SOURCE)
TEST_F(HybridClockTest, TestNtpDiagnostics) {
FLAGS_time_source = "system";
- clock_.reset(new HybridClock);
- ASSERT_OK(clock_->Init());
+ HybridClock clock(metric_entity_);
+ ASSERT_OK(clock.Init());
vector<string> log;
- clock_->time_service()->DumpDiagnostics(&log);
+ clock.time_service()->DumpDiagnostics(&log);
string s = JoinStrings(log, "\n");
SCOPED_TRACE(s);
ASSERT_STR_MATCHES(s, "(ntp_gettime\\(\\) returns code |chronyc -n
tracking)");
diff --git a/src/kudu/clock/hybrid_clock.cc b/src/kudu/clock/hybrid_clock.cc
index 459d605..2be46ec 100644
--- a/src/kudu/clock/hybrid_clock.cc
+++ b/src/kudu/clock/hybrid_clock.cc
@@ -155,10 +155,18 @@ const int HybridClock::kBitsToShift = 12;
// This mask gives us back the logical bits.
const uint64_t HybridClock::kLogicalBitMask = (1 << kBitsToShift) - 1;
-
-HybridClock::HybridClock()
+HybridClock::HybridClock(const scoped_refptr<MetricEntity>& metric_entity)
: next_timestamp_(0),
state_(kNotInitialized) {
+ DCHECK(metric_entity);
+ METRIC_hybrid_clock_timestamp.InstantiateFunctionGauge(
+ metric_entity,
+ Bind(&HybridClock::NowForMetrics, Unretained(this)))->
+ AutoDetachToLastValue(&metric_detacher_);
+ METRIC_hybrid_clock_error.InstantiateFunctionGauge(
+ metric_entity,
+ Bind(&HybridClock::ErrorForMetrics, Unretained(this)))->
+ AutoDetachToLastValue(&metric_detacher_);
}
Status HybridClock::Init() {
@@ -253,7 +261,6 @@ Status HybridClock::GetGlobalLatest(Timestamp* t) {
}
void HybridClock::NowWithError(Timestamp* timestamp, uint64_t* max_error_usec)
{
-
DCHECK_EQ(state_, kInitialized) << "Clock not initialized. Must call Init()
first.";
uint64_t now_usec;
@@ -515,17 +522,6 @@ uint64_t HybridClock::ErrorForMetrics() {
return error;
}
-void HybridClock::RegisterMetrics(const scoped_refptr<MetricEntity>&
metric_entity) {
- METRIC_hybrid_clock_timestamp.InstantiateFunctionGauge(
- metric_entity,
- Bind(&HybridClock::NowForMetrics, Unretained(this)))
- ->AutoDetachToLastValue(&metric_detacher_);
- METRIC_hybrid_clock_error.InstantiateFunctionGauge(
- metric_entity,
- Bind(&HybridClock::ErrorForMetrics, Unretained(this)))
- ->AutoDetachToLastValue(&metric_detacher_);
-}
-
string HybridClock::Stringify(Timestamp timestamp) {
return StringifyTimestamp(timestamp);
}
diff --git a/src/kudu/clock/hybrid_clock.h b/src/kudu/clock/hybrid_clock.h
index dbfa201..be855de 100644
--- a/src/kudu/clock/hybrid_clock.h
+++ b/src/kudu/clock/hybrid_clock.h
@@ -39,7 +39,9 @@ namespace clock {
// since NTP clock error is not available.
class HybridClock : public Clock {
public:
- HybridClock();
+ // Create an instance, registering HybridClock's metrics with the specified
+ // metric entity.
+ explicit HybridClock(const scoped_refptr<MetricEntity>& metric_entity);
Status Init() override;
@@ -59,8 +61,6 @@ class HybridClock : public Clock {
// Updates the clock with a timestamp originating on another machine.
Status Update(const Timestamp& to_update) override;
- void RegisterMetrics(const scoped_refptr<MetricEntity>& metric_entity)
override;
-
// HybridClock supports all external consistency modes.
bool SupportsExternalConsistencyMode(ExternalConsistencyMode mode) override;
@@ -152,7 +152,7 @@ class HybridClock : public Clock {
// separated.
static std::string StringifyTimestamp(const Timestamp& timestamp);
- clock::TimeService* time_service() {
+ clock::TimeService* time_service() const {
return time_service_.get();
}
@@ -176,7 +176,7 @@ class HybridClock : public Clock {
// service.
std::unique_ptr<clock::TimeService> time_service_;
- mutable simple_spinlock lock_;
+ simple_spinlock lock_;
// The next timestamp to be generated from this clock, assuming that
// the physical clock hasn't advanced beyond the value stored here.
@@ -184,7 +184,7 @@ class HybridClock : public Clock {
// The last valid clock reading we got from the time source, along
// with the monotime that we took that reading.
- mutable simple_spinlock last_clock_read_lock_;
+ simple_spinlock last_clock_read_lock_;
MonoTime last_clock_read_time_;
uint64_t last_clock_read_physical_;
uint64_t last_clock_read_error_;
diff --git a/src/kudu/clock/logical_clock-test.cc
b/src/kudu/clock/logical_clock-test.cc
index a6b5cb4..7ee06b9 100644
--- a/src/kudu/clock/logical_clock-test.cc
+++ b/src/kudu/clock/logical_clock-test.cc
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-#include <memory>
-
#include <gtest/gtest.h>
#include "kudu/clock/logical_clock.h"
@@ -26,66 +24,64 @@
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
-using std::unique_ptr;
-
namespace kudu {
namespace clock {
class LogicalClockTest : public KuduTest {
public:
LogicalClockTest()
- : clock_(LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)) {
+ : clock_(Timestamp::kInitialTimestamp) {
}
protected:
- unique_ptr<LogicalClock> clock_;
+ LogicalClock clock_;
};
// Test that two subsequent time reads are monotonically increasing.
-TEST_F(LogicalClockTest, TestNow_ValuesIncreaseMonotonically) {
- const Timestamp now1 = clock_->Now();
- const Timestamp now2 = clock_->Now();
+TEST_F(LogicalClockTest, NowValuesIncreaseMonotonically) {
+ const Timestamp now1 = clock_.Now();
+ const Timestamp now2 = clock_.Now();
ASSERT_EQ(now1.value() + 1, now2.value());
}
// Tests that the clock gets updated if the incoming value is higher.
-TEST_F(LogicalClockTest, TestUpdate_LogicalValueIncreasesByAmount) {
- Timestamp initial = clock_->Now();
+TEST_F(LogicalClockTest, UpdateLogicalValueIncreasesByAmount) {
+ Timestamp initial = clock_.Now();
Timestamp future(initial.value() + 10);
- clock_->Update(future);
- Timestamp now = clock_->Now();
+ clock_.Update(future);
+ Timestamp now = clock_.Now();
// now should be 1 after future
ASSERT_EQ(initial.value() + 11, now.value());
}
// Tests that the clock doesn't get updated if the incoming value is lower.
-TEST_F(LogicalClockTest, TestUpdate_LogicalValueDoesNotIncrease) {
+TEST_F(LogicalClockTest, UpdateLogicalValueDoesNotIncrease) {
Timestamp ts(1);
// update the clock to 1, the initial value, should do nothing
- clock_->Update(ts);
- Timestamp now = clock_->Now();
- ASSERT_EQ(now.value(), 2);
+ clock_.Update(ts);
+ Timestamp now = clock_.Now();
+ ASSERT_EQ(2, now.value());
}
TEST_F(LogicalClockTest, TestWaitUntilAfterIsUnavailable) {
- Status status = clock_->WaitUntilAfter(
+ Status status = clock_.WaitUntilAfter(
Timestamp(10), MonoTime::Now());
ASSERT_TRUE(status.IsServiceUnavailable());
}
TEST_F(LogicalClockTest, TestIsAfter) {
- Timestamp ts1 = clock_->Now();
- ASSERT_TRUE(clock_->IsAfter(ts1));
+ Timestamp ts1 = clock_.Now();
+ ASSERT_TRUE(clock_.IsAfter(ts1));
// Update the clock in the future, make sure it still
// handles "IsAfter" properly even when it's running in
// "logical" mode.
Timestamp now_increased = Timestamp(1000);
- ASSERT_OK(clock_->Update(now_increased));
- Timestamp ts2 = clock_->Now();
+ ASSERT_OK(clock_.Update(now_increased));
+ Timestamp ts2 = clock_.Now();
- ASSERT_TRUE(clock_->IsAfter(ts1));
- ASSERT_TRUE(clock_->IsAfter(ts2));
+ ASSERT_TRUE(clock_.IsAfter(ts1));
+ ASSERT_TRUE(clock_.IsAfter(ts2));
}
} // namespace clock
diff --git a/src/kudu/clock/logical_clock.cc b/src/kudu/clock/logical_clock.cc
index 6fa861e..3403e01 100644
--- a/src/kudu/clock/logical_clock.cc
+++ b/src/kudu/clock/logical_clock.cc
@@ -46,6 +46,17 @@ using std::unique_ptr;
namespace kudu {
namespace clock {
+LogicalClock::LogicalClock(const Timestamp& timestamp,
+ const scoped_refptr<MetricEntity>& metric_entity)
+ : now_(timestamp.value() - 1) {
+ if (metric_entity) {
+ METRIC_logical_clock_timestamp.InstantiateFunctionGauge(
+ metric_entity,
+ Bind(&LogicalClock::GetCurrentTime, Unretained(this)))->
+ AutoDetachToLastValue(&metric_detacher_);
+ }
+}
+
Timestamp LogicalClock::Now() {
return Timestamp(Barrier_AtomicIncrement(&now_, 1));
}
@@ -90,23 +101,11 @@ bool LogicalClock::IsAfter(Timestamp t) {
return base::subtle::Acquire_Load(&now_) >= t.value();
}
-unique_ptr<LogicalClock> LogicalClock::CreateStartingAt(const Timestamp&
timestamp) {
- // initialize at 'timestamp' - 1 so that the first output value is
'timestamp'.
- return unique_ptr<LogicalClock>(new LogicalClock(timestamp.value() - 1));
-}
-
uint64_t LogicalClock::GetCurrentTime() {
// We don't want reading metrics to change the clock.
return NoBarrier_Load(&now_);
}
-void LogicalClock::RegisterMetrics(const scoped_refptr<MetricEntity>&
metric_entity) {
- METRIC_logical_clock_timestamp.InstantiateFunctionGauge(
- metric_entity,
- Bind(&LogicalClock::GetCurrentTime, Unretained(this)))
- ->AutoDetachToLastValue(&metric_detacher_);
-}
-
std::string LogicalClock::Stringify(Timestamp timestamp) {
return strings::Substitute("L: $0", timestamp.ToUint64());
}
diff --git a/src/kudu/clock/logical_clock.h b/src/kudu/clock/logical_clock.h
index 0a74a51..bd92945 100644
--- a/src/kudu/clock/logical_clock.h
+++ b/src/kudu/clock/logical_clock.h
@@ -17,7 +17,6 @@
#pragma once
#include <cstdint>
-#include <memory>
#include <string>
#include "kudu/clock/clock.h"
@@ -47,6 +46,9 @@ namespace clock {
// NOTE: this class is thread safe.
class LogicalClock : public Clock {
public:
+ // Create logical clock starting with the given timestamp.
+ explicit LogicalClock(const Timestamp& timestamp,
+ const scoped_refptr<MetricEntity>& metric_entity = {});
Status Init() override { return Status::OK(); }
@@ -65,8 +67,6 @@ class LogicalClock : public Clock {
bool IsAfter(Timestamp t) override;
- void RegisterMetrics(const scoped_refptr<MetricEntity>& metric_entity)
override;
-
std::string Stringify(Timestamp timestamp) override;
// Used to get the timestamp without incrementing the logical component.
@@ -78,13 +78,7 @@ class LogicalClock : public Clock {
return mode != COMMIT_WAIT;
}
- // Creates a logical clock whose first output value on a Now() call is
'timestamp'.
- static std::unique_ptr<LogicalClock> CreateStartingAt(const Timestamp&
timestamp);
-
private:
- // Should use LogicalClock::CreatingStartingAt()
- explicit LogicalClock(Timestamp::val_type initial_time) : now_(initial_time)
{}
-
base::subtle::Atomic64 now_;
FunctionGaugeDetacher metric_detacher_;
diff --git a/src/kudu/consensus/consensus_peers-test.cc
b/src/kudu/consensus/consensus_peers-test.cc
index d3afb92..4a73948 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -54,18 +54,19 @@
#include "kudu/util/threadpool.h"
METRIC_DECLARE_entity(tablet);
+METRIC_DECLARE_entity(server);
-namespace kudu {
-namespace consensus {
-
-using log::Log;
-using log::LogOptions;
-using rpc::Messenger;
-using rpc::MessengerBuilder;
+using kudu::log::Log;
+using kudu::log::LogOptions;
+using kudu::rpc::Messenger;
+using kudu::rpc::MessengerBuilder;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
+namespace kudu {
+namespace consensus {
+
const char* kTabletId = "test-peers-tablet";
const char* kLeaderUuid = "peer-0";
const char* kFollowerUuid = "peer-1";
@@ -73,8 +74,11 @@ const char* kFollowerUuid = "peer-1";
class ConsensusPeersTest : public KuduTest {
public:
ConsensusPeersTest()
- : metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_,
"peer-test")),
- schema_(GetSimpleTestSchema()) {
+ : metric_entity_server_(METRIC_ENTITY_server.Instantiate(
+ &metric_registry_, "consensus-peer-test::server")),
+ metric_entity_tablet_(METRIC_ENTITY_tablet.Instantiate(
+ &metric_registry_, "consensus-peer-test::tablet")),
+ schema_(GetSimpleTestSchema()) {
CHECK_OK(ThreadPoolBuilder("test-raft-pool").Build(&raft_pool_));
raft_pool_token_ =
raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
}
@@ -92,13 +96,13 @@ class ConsensusPeersTest : public KuduTest {
0, // schema_version
/*metric_entity*/nullptr,
&log_));
- clock_.reset(new clock::HybridClock());
+ clock_.reset(new clock::HybridClock(metric_entity_server_));
ASSERT_OK(clock_->Init());
time_manager_.reset(new TimeManager(clock_.get(), Timestamp::kMin));
message_queue_.reset(new PeerMessageQueue(
- metric_entity_,
+ metric_entity_tablet_,
log_.get(),
time_manager_.get(),
FakeRaftPeerPB(kLeaderUuid),
@@ -164,7 +168,8 @@ class ConsensusPeersTest : public KuduTest {
protected:
MetricRegistry metric_registry_;
- scoped_refptr<MetricEntity> metric_entity_;
+ scoped_refptr<MetricEntity> metric_entity_server_;
+ scoped_refptr<MetricEntity> metric_entity_tablet_;
unique_ptr<FsManager> fs_manager_;
scoped_refptr<Log> log_;
unique_ptr<ThreadPool> raft_pool_;
diff --git a/src/kudu/consensus/consensus_queue-test.cc
b/src/kudu/consensus/consensus_queue-test.cc
index 572fe31..b77c8db 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -84,7 +84,10 @@ class ConsensusQueueTest : public KuduTest {
public:
ConsensusQueueTest()
: schema_(GetSimpleTestSchema()),
- metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_,
"queue-test")),
+ metric_entity_server_(METRIC_ENTITY_server.Instantiate(
+ &metric_registry_, "consensus-queue-test::server")),
+ metric_entity_tablet_(METRIC_ENTITY_tablet.Instantiate(
+ &metric_registry_, "consensus-queue-test::tablet")),
registry_(new log::LogAnchorRegistry),
quiescing_(false) {
}
@@ -102,7 +105,7 @@ class ConsensusQueueTest : public KuduTest {
/*schema_version*/0,
/*metric_entity*/nullptr,
&log_));
- clock_.reset(new clock::HybridClock());
+ clock_.reset(new clock::HybridClock(metric_entity_server_));
ASSERT_OK(clock_->Init());
ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
@@ -112,7 +115,7 @@ class ConsensusQueueTest : public KuduTest {
void CloseAndReopenQueue(const OpId& replicated_opid, const OpId&
committed_opid) {
queue_.reset(new PeerMessageQueue(
- metric_entity_,
+ metric_entity_tablet_,
log_.get(),
time_manager_.get(),
FakeRaftPeerPB(kLeaderUuid),
@@ -234,9 +237,10 @@ class ConsensusQueueTest : public KuduTest {
protected:
const Schema schema_;
- gscoped_ptr<FsManager> fs_manager_;
MetricRegistry metric_registry_;
- scoped_refptr<MetricEntity> metric_entity_;
+ scoped_refptr<MetricEntity> metric_entity_server_;
+ scoped_refptr<MetricEntity> metric_entity_tablet_;
+ gscoped_ptr<FsManager> fs_manager_;
scoped_refptr<log::Log> log_;
unique_ptr<ThreadPool> raft_pool_;
unique_ptr<TimeManager> time_manager_;
diff --git a/src/kudu/consensus/log-test-base.h
b/src/kudu/consensus/log-test-base.h
index 345492a..1ae43ae 100644
--- a/src/kudu/consensus/log-test-base.h
+++ b/src/kudu/consensus/log-test-base.h
@@ -156,7 +156,7 @@ class LogTestBase : public KuduTest {
KuduTest::SetUp();
current_index_ = kStartIndex;
fs_manager_.reset(new FsManager(env_,
FsManagerOpts(GetTestPath("fs_root"))));
- metric_registry_.reset(new MetricRegistry());
+ metric_registry_.reset(new MetricRegistry);
metric_entity_tablet_ = METRIC_ENTITY_tablet.Instantiate(
metric_registry_.get(), "tablet");
metric_entity_server_ = METRIC_ENTITY_server.Instantiate(
@@ -168,7 +168,7 @@ class LogTestBase : public KuduTest {
ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
ASSERT_OK(fs_manager_->Open());
- clock_.reset(new clock::HybridClock());
+ clock_.reset(new clock::HybridClock(metric_entity_server_));
ASSERT_OK(clock_->Init());
}
diff --git a/src/kudu/consensus/log_cache-test.cc
b/src/kudu/consensus/log_cache-test.cc
index 8ebc97a..7057555 100644
--- a/src/kudu/consensus/log_cache-test.cc
+++ b/src/kudu/consensus/log_cache-test.cc
@@ -66,6 +66,7 @@ using strings::Substitute;
DECLARE_int32(log_cache_size_limit_mb);
DECLARE_int32(global_log_cache_size_limit_mb);
+METRIC_DECLARE_entity(server);
METRIC_DECLARE_entity(tablet);
namespace kudu {
@@ -77,8 +78,11 @@ static const char* kTestTablet = "test-tablet";
class LogCacheTest : public KuduTest {
public:
LogCacheTest()
- : schema_(GetSimpleTestSchema()),
- metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_,
"LogCacheTest")) {
+ : schema_(GetSimpleTestSchema()),
+ metric_entity_server_(METRIC_ENTITY_server.Instantiate(
+ &metric_registry_, "LogCacheTest::server")),
+ metric_entity_tablet_(METRIC_ENTITY_tablet.Instantiate(
+ &metric_registry_, "LogCacheTest::tablet")) {
}
virtual void SetUp() OVERRIDE {
@@ -96,7 +100,7 @@ class LogCacheTest : public KuduTest {
&log_));
CloseAndReopenCache(MinimumOpId());
- clock_.reset(new clock::HybridClock());
+ clock_.reset(new clock::HybridClock(metric_entity_server_));
ASSERT_OK(clock_->Init());
}
@@ -105,7 +109,7 @@ class LogCacheTest : public KuduTest {
}
void CloseAndReopenCache(const OpId& preceding_id) {
- cache_.reset(new LogCache(metric_entity_,
+ cache_.reset(new LogCache(metric_entity_tablet_,
log_.get(),
kPeerUuid,
kTestTablet));
@@ -133,7 +137,8 @@ class LogCacheTest : public KuduTest {
const Schema schema_;
MetricRegistry metric_registry_;
- scoped_refptr<MetricEntity> metric_entity_;
+ scoped_refptr<MetricEntity> metric_entity_server_;
+ scoped_refptr<MetricEntity> metric_entity_tablet_;
unique_ptr<FsManager> fs_manager_;
unique_ptr<LogCache> cache_;
scoped_refptr<log::Log> log_;
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc
b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 3a60f68..2fb196b 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -31,7 +31,6 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
-#include "kudu/clock/clock.h"
#include "kudu/clock/logical_clock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/schema.h"
@@ -119,9 +118,9 @@ class RaftConsensusQuorumTest : public KuduTest {
typedef vector<unique_ptr<LogEntryPB>> LogEntries;
RaftConsensusQuorumTest()
- : clock_(clock::LogicalClock::CreateStartingAt(Timestamp(1))),
- metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_,
"raft-test")),
- schema_(GetSimpleTestSchema()) {
+ : clock_(Timestamp(1)),
+ metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_,
"raft-test")),
+ schema_(GetSimpleTestSchema()) {
options_.tablet_id = kTestTablet;
FLAGS_enable_leader_failure_detection = false;
CHECK_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
@@ -217,7 +216,7 @@ class RaftConsensusQuorumTest : public KuduTest {
unique_ptr<PeerProxyFactory> proxy_factory(
new LocalTestPeerProxyFactory(peers_.get()));
unique_ptr<TimeManager> time_manager(
- new TimeManager(clock_.get(), Timestamp::kMin));
+ new TimeManager(&clock_, Timestamp::kMin));
unique_ptr<TestTransactionFactory> txn_factory(
new TestTransactionFactory(logs_[i].get()));
txn_factory->SetConsensus(peer.get());
@@ -274,7 +273,7 @@ class RaftConsensusQuorumTest : public KuduTest {
gscoped_ptr<ReplicateMsg> msg(new ReplicateMsg());
msg->set_op_type(NO_OP);
msg->mutable_noop_request();
- msg->set_timestamp(clock_->Now().ToUint64());
+ msg->set_timestamp(clock_.Now().ToUint64());
shared_ptr<RaftConsensus> peer;
CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer));
@@ -584,7 +583,7 @@ class RaftConsensusQuorumTest : public KuduTest {
unique_ptr<ThreadPool> raft_pool_;
unique_ptr<TestPeerMapManager> peers_;
vector<unique_ptr<TestTransactionFactory>> txn_factories_;
- unique_ptr<clock::Clock> clock_;
+ clock::LogicalClock clock_;
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
const Schema schema_;
@@ -962,7 +961,7 @@ TEST_F(RaftConsensusQuorumTest,
TestReplicasEnforceTheLogMatchingProperty) {
// Send a request with the next index.
ReplicateMsg* replicate = req.add_ops();
- replicate->set_timestamp(clock_->Now().ToUint64());
+ replicate->set_timestamp(clock_.Now().ToUint64());
OpId* id = replicate->mutable_id();
id->set_term(last_op_id.term());
id->set_index(last_op_id.index() + 1);
diff --git a/src/kudu/consensus/time_manager-test.cc
b/src/kudu/consensus/time_manager-test.cc
index c1f37d1..bd02933 100644
--- a/src/kudu/consensus/time_manager-test.cc
+++ b/src/kudu/consensus/time_manager-test.cc
@@ -27,7 +27,9 @@
#include "kudu/clock/hybrid_clock.h"
#include "kudu/common/timestamp.h"
#include "kudu/consensus/consensus.pb.h"
+#include "kudu/gutil/ref_counted.h"
#include "kudu/util/countdown_latch.h"
+#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
@@ -37,15 +39,21 @@ using std::thread;
using std::unique_ptr;
using std::vector;
+METRIC_DECLARE_entity(server);
+
namespace kudu {
namespace consensus {
class TimeManagerTest : public KuduTest {
public:
- TimeManagerTest() : clock_(new clock::HybridClock) {}
+ TimeManagerTest()
+ : metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
+ "time-manager-test")),
+ clock_(metric_entity_) {
+ }
void SetUp() override {
- CHECK_OK(clock_->Init());
+ CHECK_OK(clock_.Init());
}
void TearDown() override {
@@ -56,7 +64,7 @@ class TimeManagerTest : public KuduTest {
protected:
void InitTimeManager(Timestamp initial_safe_time = Timestamp::kMin) {
- time_manager_.reset(new TimeManager(clock_.get(), initial_safe_time));
+ time_manager_.reset(new TimeManager(&clock_, initial_safe_time));
}
// Returns a latch that allows to wait for TimeManager to consider
'safe_time' safe.
@@ -72,7 +80,9 @@ class TimeManagerTest : public KuduTest {
return latch;
}
- unique_ptr<clock::HybridClock> clock_;
+ MetricRegistry metric_registry_;
+ scoped_refptr<MetricEntity> metric_entity_;
+ clock::HybridClock clock_;
unique_ptr<TimeManager> time_manager_;
vector<unique_ptr<CountDownLatch>> latches_;
vector<thread> threads_;
@@ -81,7 +91,7 @@ class TimeManagerTest : public KuduTest {
// Tests TimeManager's functionality in non-leader mode and the transition to
leader mode.
TEST_F(TimeManagerTest, TestTimeManagerNonLeaderMode) {
// TimeManager should start in non-leader mode and consider the initial
timestamp safe.
- Timestamp before = clock_->Now();
+ Timestamp before = clock_.Now();
Timestamp init(before.value() + 1);
Timestamp after(init.value() + 1);
InitTimeManager(init);
@@ -129,14 +139,14 @@ TEST_F(TimeManagerTest, TestTimeManagerNonLeaderMode) {
// Advance 'after' again and test advancing safe time with an explicit
timestamp like
// the leader sends on (empty) heartbeat messages.
- after = clock_->Now();
+ after = clock_.Now();
after_latch = WaitForSafeTimeAsync(after);
time_manager_->AdvanceSafeTime(after);
after_latch->Wait();
ASSERT_EQ(time_manager_->GetSafeTime(), after);
// Changing to leader mode should advance safe time.
- after = clock_->Now();
+ after = clock_.Now();
after_latch = WaitForSafeTimeAsync(after);
time_manager_->SetLeaderMode();
after_latch->Wait();
@@ -145,7 +155,7 @@ TEST_F(TimeManagerTest, TestTimeManagerNonLeaderMode) {
// Tests the TimeManager's functionality in leader mode and the transition to
non-leader mode.
TEST_F(TimeManagerTest, TestTimeManagerLeaderMode) {
- Timestamp init = clock_->Now();
+ Timestamp init = clock_.Now();
InitTimeManager(init);
time_manager_->SetLeaderMode();
Timestamp safe_before = time_manager_->GetSafeTime();
@@ -165,7 +175,7 @@ TEST_F(TimeManagerTest, TestTimeManagerLeaderMode) {
// .. as should AdvanceSafeTime()
EXPECT_DEATH({
- time_manager_->AdvanceSafeTime(clock_->Now());
+ time_manager_->AdvanceSafeTime(clock_.Now());
}, "Cannot advance safe time by timestamp in leader mode.");
// Since we haven't appended the message to the queue, safe time should be
'pinned' to
@@ -177,13 +187,13 @@ TEST_F(TimeManagerTest, TestTimeManagerLeaderMode) {
ASSERT_GT(time_manager_->GetSafeTime(), message_ts);
// 'Now' should be safe.
- Timestamp now = clock_->Now();
+ Timestamp now = clock_.Now();
ASSERT_TRUE(time_manager_->IsTimestampSafe(now));
ASSERT_GT(time_manager_->GetSafeTime(), now);
// When changing to non-leader mode a timestamp after the last safe time
shouldn't be
// safe anymore (even if that time came before the actual change).
- now = clock_->Now();
+ now = clock_.Now();
time_manager_->SetNonLeaderMode();
Timestamp safe_after = time_manager_->GetSafeTime();
ASSERT_LE(safe_after, now);
@@ -191,13 +201,13 @@ TEST_F(TimeManagerTest, TestTimeManagerLeaderMode) {
// In leader mode GetSafeTime() usually moves it, but since we changed to
non-leader mode
// safe time shouldn't move anymore ...
ASSERT_EQ(time_manager_->GetSafeTime(), safe_after);
- now = clock_->Now();
+ now = clock_.Now();
MonoTime after_small = MonoTime::Now();
after_small.AddDelta(MonoDelta::FromMilliseconds(100));
ASSERT_TRUE(time_manager_->WaitUntilSafe(now, after_small).IsTimedOut());
// ... unless we get a message from the leader.
- now = clock_->Now();
+ now = clock_.Now();
CountDownLatch* after_latch = WaitForSafeTimeAsync(now);
message.set_timestamp(now.value());
ASSERT_OK(time_manager_->MessageReceivedFromLeader(message));
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc
b/src/kudu/integration-tests/ts_recovery-itest.cc
index 04541ac..f0f4e7e 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -33,7 +33,6 @@
#include "kudu/client/client.h"
#include "kudu/client/shared_ptr.h"
#include "kudu/client/write_op.h"
-#include "kudu/clock/clock.h"
#include "kudu/clock/hybrid_clock.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/schema.h"
@@ -88,7 +87,6 @@ using kudu::client::KuduUpdate;
using kudu::client::sp::shared_ptr;
using kudu::cluster::ExternalTabletServer;
using kudu::cluster::ExternalMiniClusterOptions;
-using kudu::clock::Clock;
using kudu::clock::HybridClock;
using kudu::consensus::ConsensusMetadata;
using kudu::consensus::ConsensusMetadataManager;
@@ -590,8 +588,11 @@ TEST_P(TsRecoveryITestDeathTest,
TestRecoverFromOpIdOverflow) {
ASSERT_OK(fs_manager->Open());
scoped_refptr<ConsensusMetadataManager> cmeta_manager(
new ConsensusMetadataManager(fs_manager.get()));
- unique_ptr<Clock> clock(new HybridClock);
- ASSERT_OK(clock->Init());
+ MetricRegistry metric_registry;
+ auto metric_entity(METRIC_ENTITY_server.Instantiate(&metric_registry,
+ "ts-recoverty-itest"));
+ HybridClock clock(metric_entity);
+ ASSERT_OK(clock.Init());
OpId opid;
opid.set_term(kOverflowedIndexValue);
@@ -610,7 +611,7 @@ TEST_P(TsRecoveryITestDeathTest,
TestRecoverFromOpIdOverflow) {
// Write a series of negative OpIds.
// This will cause a crash, but only after they have been written to
disk.
- ASSERT_OK(AppendNoOpsToLogSync(clock.get(), log.get(), &opid,
kNumOverflowedEntriesToWrite));
+ ASSERT_OK(AppendNoOpsToLogSync(&clock, log.get(), &opid,
kNumOverflowedEntriesToWrite));
}, "Check failed: log_index > 0");
// Before restarting the tablet server, delete the initial log segment from
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 74be49c..a5bf5ad 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -433,9 +433,10 @@ ServerBase::ServerBase(string name, const
ServerBaseOptions& options,
fs_manager_.reset(new FsManager(options.env, std::move(fs_opts)));
if (FLAGS_use_hybrid_clock) {
- clock_.reset(new clock::HybridClock);
+ clock_.reset(new clock::HybridClock(metric_entity_));
} else {
- clock_ =
clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp);
+ clock_.reset(new clock::LogicalClock(Timestamp::kInitialTimestamp,
+ metric_entity_));
}
if (FLAGS_webserver_enabled) {
@@ -544,7 +545,6 @@ Status ServerBase::Init() {
RETURN_NOT_OK(rpc_server_->Init(messenger_));
RETURN_NOT_OK(rpc_server_->Bind());
- clock_->RegisterMetrics(metric_entity_);
RETURN_NOT_OK_PREPEND(StartMetricsLogging(), "Could not enable metrics
logging");
diff --git a/src/kudu/tablet/compaction-test.cc
b/src/kudu/tablet/compaction-test.cc
index 65144e8..974fa18 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -109,12 +109,12 @@ static const size_t kSmallRollThreshold = 1024; // 1KB
class TestCompaction : public KuduRowSetTest {
public:
TestCompaction()
- : KuduRowSetTest(CreateSchema()),
- op_id_(consensus::MaximumOpId()),
- row_builder_(&schema_),
- arena_(32*1024),
-
clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)),
- log_anchor_registry_(new log::LogAnchorRegistry()) {
+ : KuduRowSetTest(CreateSchema()),
+ op_id_(consensus::MaximumOpId()),
+ row_builder_(&schema_),
+ arena_(32*1024),
+ clock_(Timestamp::kInitialTimestamp),
+ log_anchor_registry_(new log::LogAnchorRegistry()) {
}
static Schema CreateSchema() {
@@ -137,7 +137,7 @@ class TestCompaction : public KuduRowSetTest {
// The 'nullable_val' column is set to either NULL (when val is odd)
// or 'val' (when val is even).
void InsertRow(MemRowSet *mrs, int row_key, int32_t val) {
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
InsertRowInTransaction(mrs, tx, row_key, val);
tx.Commit();
@@ -188,7 +188,7 @@ class TestCompaction : public KuduRowSetTest {
}
void UpdateRow(RowSet *rowset, int row_key, int32_t new_val) {
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
UpdateRowInTransaction(rowset, tx, row_key, new_val);
tx.Commit();
@@ -243,7 +243,7 @@ class TestCompaction : public KuduRowSetTest {
}
void DeleteRow(RowSet* rowset, int row_key) {
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
DeleteRowInTransaction(rowset, tx, row_key);
tx.Commit();
@@ -484,7 +484,7 @@ class TestCompaction : public KuduRowSetTest {
RowBuilder row_builder_;
char key_buf_[256];
Arena arena_;
- unique_ptr<clock::LogicalClock> clock_;
+ clock::LogicalClock clock_;
MvccManager mvcc_;
scoped_refptr<LogAnchorRegistry> log_anchor_registry_;
@@ -663,7 +663,7 @@ void TestCompaction::AddExpectedDelete(Mutation**
current_head, Timestamp ts) {
faststring buf;
RowChangeListEncoder enc(&buf);
enc.SetToDelete();
- if (ts == Timestamp::kInvalidTimestamp) ts =
Timestamp(clock_->GetCurrentTime());
+ if (ts == Timestamp::kInvalidTimestamp) ts =
Timestamp(clock_.GetCurrentTime());
Mutation* mutation = Mutation::CreateInArena(&arena_,
ts,
enc.as_changelist());
@@ -682,7 +682,7 @@ void TestCompaction::AddExpectedUpdate(Mutation**
current_head, int32_t val) {
enc.AddColumnUpdate(schema_.column(2), schema_.column_id(2), nullptr);
}
Mutation* mutation = Mutation::CreateInArena(&arena_,
-
Timestamp(clock_->GetCurrentTime()),
+
Timestamp(clock_.GetCurrentTime()),
enc.as_changelist());
mutation->set_next(*current_head);
*current_head = mutation;
@@ -698,7 +698,8 @@ void TestCompaction::AddExpectedReinsert(Mutation**
current_head, int32_t val) {
} else {
enc.EncodeColumnMutation(schema_.column(2), schema_.column_id(2), nullptr);
}
- Mutation* mutation = Mutation::CreateInArena(&arena_,
Timestamp(clock_->GetCurrentTime()),
+ Mutation* mutation = Mutation::CreateInArena(&arena_,
+
Timestamp(clock_.GetCurrentTime()),
enc.as_changelist());
mutation->set_next(*current_head);
*current_head = mutation;
@@ -857,7 +858,7 @@ TEST_F(TestCompaction,
TestMRSCompactionDoesntOutputUnobservableRows) {
shared_ptr<MemRowSet> mrs;
ASSERT_OK(MemRowSet::Create(1, schema_, log_anchor_registry_.get(),
mem_trackers_.tablet_tracker, &mrs));
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
DeleteRowInTransaction(rs1.get(), tx, 1);
InsertRowInTransaction(mrs.get(), tx, 1, 2);
diff --git a/src/kudu/tablet/deltamemstore-test.cc
b/src/kudu/tablet/deltamemstore-test.cc
index 09abc9a..4a37c39 100644
--- a/src/kudu/tablet/deltamemstore-test.cc
+++ b/src/kudu/tablet/deltamemstore-test.cc
@@ -34,7 +34,6 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
-#include "kudu/clock/clock.h"
#include "kudu/clock/logical_clock.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
@@ -92,9 +91,9 @@ using fs::WritableBlock;
class TestDeltaMemStore : public KuduTest {
public:
TestDeltaMemStore()
- : op_id_(consensus::MaximumOpId()),
- schema_(CreateSchema()),
-
clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)) {
+ : op_id_(consensus::MaximumOpId()),
+ schema_(CreateSchema()),
+ clock_(Timestamp::kInitialTimestamp) {
CHECK_OK(DeltaMemStore::Create(0, 0,
new log::LogAnchorRegistry(),
MemTracker::GetRootTracker(), &dms_));
@@ -115,7 +114,7 @@ class TestDeltaMemStore : public KuduTest {
RowChangeListEncoder update(&buf);
for (uint32_t idx_to_update : indexes_to_update) {
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
update.Reset();
uint32_t new_val = idx_to_update * 10;
@@ -161,7 +160,7 @@ class TestDeltaMemStore : public KuduTest {
const Schema schema_;
shared_ptr<DeltaMemStore> dms_;
- unique_ptr<clock::Clock> clock_;
+ clock::LogicalClock clock_;
MvccManager mvcc_;
};
@@ -193,7 +192,7 @@ TEST_F(TestDeltaMemStore, TestUpdateCount) {
schema_.column_id(kStringColumn), &s);
}
if (idx % 2 == 0) {
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
uint32_t new_val = idx * 10;
update.AddColumnUpdate(schema_.column(kIntColumn),
@@ -267,7 +266,7 @@ TEST_F(TestDeltaMemStore, BenchmarkManyUpdatesToOneRow) {
faststring buf;
RowChangeListEncoder update(&buf);
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
string str(kStringDataSize, 'x');
Slice s(str);
@@ -276,7 +275,7 @@ TEST_F(TestDeltaMemStore, BenchmarkManyUpdatesToOneRow) {
CHECK_OK(dms_->Update(tx.timestamp(), kIdxToUpdate, RowChangeList(buf),
op_id_));
tx.Commit();
}
- mvcc_.AdjustSafeTime(clock_->Now());
+ mvcc_.AdjustSafeTime(clock_.Now());
MvccSnapshot snap(mvcc_);
LOG_TIMING(INFO, "Applying updates") {
@@ -390,7 +389,7 @@ TEST_F(TestDeltaMemStore, TestReUpdateSlice) {
// the update gets cleared after usage. This ensures that the
// underlying data is properly copied into the DMS arena.
{
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
char buf[256] = "update 1";
Slice s(buf);
@@ -404,7 +403,7 @@ TEST_F(TestDeltaMemStore, TestReUpdateSlice) {
// Update the same cell again with a different value
{
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
char buf[256] = "update 2";
Slice s(buf);
@@ -441,8 +440,8 @@ TEST_F(TestDeltaMemStore, TestOutOfOrderTxns) {
RowChangeListEncoder update(&update_buf);
{
- ScopedTransaction tx1(&mvcc_, clock_->Now());
- ScopedTransaction tx2(&mvcc_, clock_->Now());
+ ScopedTransaction tx1(&mvcc_, clock_.Now());
+ ScopedTransaction tx2(&mvcc_, clock_.Now());
tx2.StartApplying();
Slice s("update 2");
@@ -476,7 +475,7 @@ TEST_F(TestDeltaMemStore, TestDMSBasic) {
char buf[256];
for (uint32_t i = 0; i < 1000; i++) {
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
update.Reset();
@@ -520,7 +519,7 @@ TEST_F(TestDeltaMemStore, TestDMSBasic) {
// these are separate transactions and we need to maintain the
// old ones for snapshot consistency purposes.
for (uint32_t i = 0; i < 1000; i++) {
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
update.Reset();
diff --git a/src/kudu/tablet/diskrowset-test-base.h
b/src/kudu/tablet/diskrowset-test-base.h
index 0c7190e..8b3dfbc 100644
--- a/src/kudu/tablet/diskrowset-test-base.h
+++ b/src/kudu/tablet/diskrowset-test-base.h
@@ -55,11 +55,11 @@ namespace tablet {
class TestRowSet : public KuduRowSetTest {
public:
TestRowSet()
- : KuduRowSetTest(CreateTestSchema()),
- n_rows_(FLAGS_roundtrip_num_rows),
- op_id_(consensus::MaximumOpId()),
-
clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)),
- log_anchor_registry_(new log::LogAnchorRegistry()) {
+ : KuduRowSetTest(CreateTestSchema()),
+ n_rows_(FLAGS_roundtrip_num_rows),
+ op_id_(consensus::MaximumOpId()),
+ clock_(Timestamp::kInitialTimestamp),
+ log_anchor_registry_(new log::LogAnchorRegistry()) {
CHECK_GT(n_rows_, 0);
}
@@ -185,7 +185,7 @@ class TestRowSet : public KuduRowSetTest {
RowSetKeyProbe probe(rb.row());
ProbeStats stats;
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
Status s = rs->MutateRow(tx.timestamp(), probe, mutation, op_id_, nullptr,
&stats, result);
tx.Commit();
@@ -339,7 +339,7 @@ class TestRowSet : public KuduRowSetTest {
size_t n_rows_;
consensus::OpId op_id_; // Generally a "fake" OpId for these tests.
- std::unique_ptr<clock::Clock> clock_;
+ clock::LogicalClock clock_;
MvccManager mvcc_;
scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
};
diff --git a/src/kudu/tablet/diskrowset-test.cc
b/src/kudu/tablet/diskrowset-test.cc
index 3437f7f..bc6f3f7 100644
--- a/src/kudu/tablet/diskrowset-test.cc
+++ b/src/kudu/tablet/diskrowset-test.cc
@@ -33,7 +33,6 @@
#include <glog/stl_logging.h>
#include <gtest/gtest.h>
-#include "kudu/clock/clock.h"
#include "kudu/clock/logical_clock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/iterator.h"
@@ -412,7 +411,7 @@ TEST_F(TestRowSet, TestFlushedUpdatesRespectMVCC) {
RowChangeListEncoder update(&update_buf);
for (uint32_t i = 2; i <= 5; i++) {
{
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
update.Reset();
update.AddColumnUpdate(schema_.column(1), schema_.column_id(1), &i);
@@ -658,7 +657,7 @@ TEST_F(TestRowSet, TestGCAncientStores) {
// Delete all the UNDO deltas. There shouldn't be any delta stores left.
int64_t blocks_deleted;
int64_t bytes_deleted;
- ASSERT_OK(dt->DeleteAncientUndoDeltas(clock_->Now(), nullptr,
&blocks_deleted, &bytes_deleted));
+ ASSERT_OK(dt->DeleteAncientUndoDeltas(clock_.Now(), nullptr,
&blocks_deleted, &bytes_deleted));
ASSERT_GT(blocks_deleted, 0);
ASSERT_GT(bytes_deleted, 0);
ASSERT_EQ(0, dt->CountUndoDeltaStores());
@@ -710,7 +709,7 @@ class DiffScanRowSetTest : public KuduRowSetTest,
DiffScanRowSetTest()
: KuduRowSetTest(CreateTestSchema()),
op_id_(consensus::MaximumOpId()),
-
clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)) {
+ clock_(Timestamp::kInitialTimestamp) {
}
protected:
@@ -723,7 +722,7 @@ class DiffScanRowSetTest : public KuduRowSetTest,
}
consensus::OpId op_id_;
- unique_ptr<clock::Clock> clock_;
+ clock::LogicalClock clock_;
MvccManager mvcc_;
};
@@ -850,7 +849,7 @@ TEST_P(DiffScanRowSetTest, TestFuzz) {
RowSetKeyProbe probe(rb.row());
// Apply the mutation.
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
ProbeStats stats;
OperationResultPB result;
diff --git a/src/kudu/tablet/memrowset-test.cc
b/src/kudu/tablet/memrowset-test.cc
index f3a5c39..773a543 100644
--- a/src/kudu/tablet/memrowset-test.cc
+++ b/src/kudu/tablet/memrowset-test.cc
@@ -32,7 +32,6 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
-#include "kudu/clock/clock.h"
#include "kudu/clock/logical_clock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/row.h"
@@ -87,7 +86,7 @@ class TestMemRowSet : public KuduTest {
log_anchor_registry_(new LogAnchorRegistry()),
schema_(CreateSchema()),
key_schema_(schema_.CreateKeyProjection()),
-
clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)) {
+ clock_(Timestamp::kInitialTimestamp) {
}
static Schema CreateSchema() {
@@ -148,7 +147,7 @@ class TestMemRowSet : public KuduTest {
}
Status InsertRow(MemRowSet *mrs, const string &key, uint32_t val) {
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
RowBuilder rb(&schema_);
rb.AddString(key);
rb.AddUint32(val);
@@ -162,7 +161,7 @@ class TestMemRowSet : public KuduTest {
const string &key,
uint32_t new_val,
OperationResultPB* result) {
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
mutation_buf_.clear();
@@ -185,7 +184,7 @@ class TestMemRowSet : public KuduTest {
}
Status DeleteRow(MemRowSet *mrs, const string &key, OperationResultPB*
result) {
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
mutation_buf_.clear();
@@ -261,7 +260,7 @@ class TestMemRowSet : public KuduTest {
faststring mutation_buf_;
const Schema schema_;
const Schema key_schema_;
- unique_ptr<clock::Clock> clock_;
+ clock::LogicalClock clock_;
MvccManager mvcc_;
};
@@ -309,7 +308,7 @@ TEST_F(TestMemRowSet, TestInsertAndIterateCompoundKey) {
RowBuilder rb(&compound_key_schema);
{
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
rb.AddString(string("hello world"));
rb.AddInt32(1);
@@ -320,7 +319,7 @@ TEST_F(TestMemRowSet, TestInsertAndIterateCompoundKey) {
}
{
- ScopedTransaction tx2(&mvcc_, clock_->Now());
+ ScopedTransaction tx2(&mvcc_, clock_.Now());
tx2.StartApplying();
rb.Reset();
rb.AddString(string("goodbye world"));
@@ -332,7 +331,7 @@ TEST_F(TestMemRowSet, TestInsertAndIterateCompoundKey) {
}
{
- ScopedTransaction tx3(&mvcc_, clock_->Now());
+ ScopedTransaction tx3(&mvcc_, clock_.Now());
tx3.StartApplying();
rb.Reset();
rb.AddString(string("goodbye world"));
@@ -549,7 +548,7 @@ TEST_F(TestMemRowSet, TestInsertionMVCC) {
// Insert 5 rows in tx 0 through 4
for (uint32_t i = 0; i < 5; i++) {
{
- ScopedTransaction tx(&mvcc_, clock_->Now());
+ ScopedTransaction tx(&mvcc_, clock_.Now());
tx.StartApplying();
RowBuilder rb(&schema_);
char keybuf[256];
diff --git a/src/kudu/tablet/mvcc-test.cc b/src/kudu/tablet/mvcc-test.cc
index d7cf1a5..0b42177 100644
--- a/src/kudu/tablet/mvcc-test.cc
+++ b/src/kudu/tablet/mvcc-test.cc
@@ -27,11 +27,11 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
-#include "kudu/clock/clock.h"
#include "kudu/clock/hybrid_clock.h"
#include "kudu/clock/logical_clock.h"
#include "kudu/common/timestamp.h"
#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
@@ -40,16 +40,15 @@
using std::thread;
using std::unique_ptr;
+METRIC_DECLARE_entity(server);
+
namespace kudu {
namespace tablet {
-using clock::Clock;
-using clock::HybridClock;
-
class MvccTest : public KuduTest {
public:
MvccTest()
- :
clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)) {
+ : clock_(Timestamp::kInitialTimestamp) {
}
void WaitForSnapshotAtTSThread(MvccManager* mgr, Timestamp ts) {
@@ -66,9 +65,8 @@ class MvccTest : public KuduTest {
}
protected:
- unique_ptr<clock::Clock> clock_;
-
- mutable simple_spinlock lock_;
+ clock::LogicalClock clock_;
+ simple_spinlock lock_;
unique_ptr<MvccSnapshot> result_snapshot_;
};
@@ -83,7 +81,7 @@ TEST_F(MvccTest, TestMvccBasic) {
ASSERT_FALSE(snap.IsCommitted(Timestamp(2)));
// Start timestamp 1
- Timestamp t = clock_->Now();
+ Timestamp t = clock_.Now();
ASSERT_EQ(1, t.value());
mgr.StartTransaction(t);
@@ -113,10 +111,10 @@ TEST_F(MvccTest, TestMvccMultipleInFlight) {
MvccManager mgr;
MvccSnapshot snap;
- Timestamp t1 = clock_->Now();
+ Timestamp t1 = clock_.Now();
ASSERT_EQ(1, t1.value());
mgr.StartTransaction(t1);
- Timestamp t2 = clock_->Now();
+ Timestamp t2 = clock_.Now();
ASSERT_EQ(2, t2.value());
mgr.StartTransaction(t2);
@@ -140,7 +138,7 @@ TEST_F(MvccTest, TestMvccMultipleInFlight) {
ASSERT_TRUE(snap.IsCommitted(t2));
// Start another transaction. This gets timestamp 3
- Timestamp t3 = clock_->Now();
+ Timestamp t3 = clock_.Now();
ASSERT_EQ(3, t3.value());
mgr.StartTransaction(t3);
@@ -182,18 +180,20 @@ TEST_F(MvccTest, TestMvccMultipleInFlight) {
}
TEST_F(MvccTest, TestOutOfOrderTxns) {
- unique_ptr<Clock> hybrid_clock(new HybridClock);
- ASSERT_OK(hybrid_clock->Init());
+ MetricRegistry metric_registry;
+ auto metric_entity(METRIC_ENTITY_server.Instantiate(&metric_registry,
"mvcc-test"));
+ clock::HybridClock hybrid_clock(metric_entity);
+ ASSERT_OK(hybrid_clock.Init());
MvccManager mgr;
// Start a normal non-commit-wait txn.
- Timestamp normal_txn = hybrid_clock->Now();
+ Timestamp normal_txn = hybrid_clock.Now();
mgr.StartTransaction(normal_txn);
MvccSnapshot s1(mgr);
// Start a transaction as if it were using commit-wait (i.e. started in
future)
- Timestamp cw_txn = hybrid_clock->NowLatest();
+ Timestamp cw_txn = hybrid_clock.NowLatest();
mgr.StartTransaction(cw_txn);
// Commit the original txn
@@ -201,7 +201,7 @@ TEST_F(MvccTest, TestOutOfOrderTxns) {
mgr.CommitTransaction(normal_txn);
// Start a new txn
- Timestamp normal_txn_2 = hybrid_clock->Now();
+ Timestamp normal_txn_2 = hybrid_clock.Now();
mgr.StartTransaction(normal_txn_2);
// The old snapshot should not have either txn
@@ -214,7 +214,7 @@ TEST_F(MvccTest, TestOutOfOrderTxns) {
EXPECT_FALSE(s2.IsCommitted(normal_txn_2));
// Commit the commit-wait one once it is time.
- ASSERT_OK(hybrid_clock->WaitUntilAfter(cw_txn, MonoTime::Max()));
+ ASSERT_OK(hybrid_clock.WaitUntilAfter(cw_txn, MonoTime::Max()));
mgr.StartApplyingTransaction(cw_txn);
mgr.CommitTransaction(cw_txn);
@@ -229,7 +229,7 @@ TEST_F(MvccTest, TestSafeTimeWithOutOfOrderTxns) {
MvccManager mgr;
// Set the clock to some time in the "future".
- ASSERT_OK(clock_->Update(Timestamp(100)));
+ ASSERT_OK(clock_.Update(Timestamp(100)));
// Start a transaction in the "past"
Timestamp txn_in_the_past(50);
@@ -266,8 +266,8 @@ TEST_F(MvccTest, TestScopedTransaction) {
MvccSnapshot snap;
{
- ScopedTransaction t1(&mgr, clock_->Now());
- ScopedTransaction t2(&mgr, clock_->Now());
+ ScopedTransaction t1(&mgr, clock_.Now());
+ ScopedTransaction t2(&mgr, clock_.Now());
ASSERT_EQ(1, t1.timestamp().value());
ASSERT_EQ(2, t2.timestamp().value());
@@ -289,7 +289,7 @@ TEST_F(MvccTest, TestScopedTransaction) {
// scope while the MvccManager is closed.
mgr.Close();
{
- ScopedTransaction t(&mgr, clock_->Now());
+ ScopedTransaction t(&mgr, clock_.Now());
NO_FATALS(t.StartApplying());
}
}
@@ -391,13 +391,13 @@ TEST_F(MvccTest, TestAreAllTransactionsCommitted) {
MvccManager mgr;
// start several transactions and take snapshots along the way
- Timestamp tx1 = clock_->Now();
+ Timestamp tx1 = clock_.Now();
mgr.StartTransaction(tx1);
- Timestamp tx2 = clock_->Now();
+ Timestamp tx2 = clock_.Now();
mgr.StartTransaction(tx2);
- Timestamp tx3 = clock_->Now();
+ Timestamp tx3 = clock_.Now();
mgr.StartTransaction(tx3);
- mgr.AdjustSafeTime(clock_->Now());
+ mgr.AdjustSafeTime(clock_.Now());
ASSERT_FALSE(mgr.AreAllTransactionsCommitted(Timestamp(1)));
ASSERT_FALSE(mgr.AreAllTransactionsCommitted(Timestamp(2)));
@@ -427,10 +427,10 @@ TEST_F(MvccTest, TestAreAllTransactionsCommitted) {
ASSERT_TRUE(mgr.AreAllTransactionsCommitted(Timestamp(3)));
}
-TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapWithNoInflights) {
+TEST_F(MvccTest, WaitForCleanSnapshotSnapWithNoInflights) {
MvccManager mgr;
- Timestamp to_wait_for = clock_->Now();
- mgr.AdjustSafeTime(clock_->Now());
+ Timestamp to_wait_for = clock_.Now();
+ mgr.AdjustSafeTime(clock_.Now());
thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this,
&mgr, to_wait_for);
// join immediately.
@@ -438,19 +438,19 @@ TEST_F(MvccTest,
TestWaitForCleanSnapshot_SnapWithNoInflights) {
ASSERT_TRUE(HasResultSnapshot());
}
-TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapBeforeSafeTimeWithInFlights) {
+TEST_F(MvccTest, WaitForCleanSnapshotSnapBeforeSafeTimeWithInFlights) {
MvccManager mgr;
- Timestamp tx1 = clock_->Now();
+ Timestamp tx1 = clock_.Now();
mgr.StartTransaction(tx1);
- Timestamp tx2 = clock_->Now();
+ Timestamp tx2 = clock_.Now();
mgr.StartTransaction(tx2);
mgr.AdjustSafeTime(tx2);
- Timestamp to_wait_for = clock_->Now();
+ Timestamp to_wait_for = clock_.Now();
// Select a safe time that is after all transactions and after the the
timestamp we'll wait for
// and adjust it on the MvccManager. This will cause "clean time" to move
when tx1 and tx2 commit.
- Timestamp safe_time = clock_->Now();
+ Timestamp safe_time = clock_.Now();
mgr.AdjustSafeTime(safe_time);
thread waiting_thread = thread(&MvccTest::WaitForSnapshotAtTSThread, this,
&mgr, to_wait_for);
@@ -465,11 +465,11 @@ TEST_F(MvccTest,
TestWaitForCleanSnapshot_SnapBeforeSafeTimeWithInFlights) {
ASSERT_TRUE(HasResultSnapshot());
}
-TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapAfterSafeTimeWithInFlights) {
+TEST_F(MvccTest, WaitForCleanSnapshotSnapAfterSafeTimeWithInFlights) {
MvccManager mgr;
- Timestamp tx1 = clock_->Now();
+ Timestamp tx1 = clock_.Now();
mgr.StartTransaction(tx1);
- Timestamp tx2 = clock_->Now();
+ Timestamp tx2 = clock_.Now();
mgr.StartTransaction(tx2);
mgr.AdjustSafeTime(tx2);
@@ -499,15 +499,15 @@ TEST_F(MvccTest,
TestWaitForCleanSnapshot_SnapAfterSafeTimeWithInFlights) {
ASSERT_OK(s);
}
-TEST_F(MvccTest, TestWaitForCleanSnapshot_SnapAtTimestampWithInFlights) {
+TEST_F(MvccTest, WaitForCleanSnapshotSnapAtTimestampWithInFlights) {
MvccManager mgr;
// Transactions with timestamp 1 through 3
- Timestamp tx1 = clock_->Now();
+ Timestamp tx1 = clock_.Now();
mgr.StartTransaction(tx1);
- Timestamp tx2 = clock_->Now();
+ Timestamp tx2 = clock_.Now();
mgr.StartTransaction(tx2);
- Timestamp tx3 = clock_->Now();
+ Timestamp tx3 = clock_.Now();
mgr.StartTransaction(tx3);
// Start a thread waiting for transactions with ts <= 2 to commit
@@ -540,9 +540,9 @@ TEST_F(MvccTest,
TestWaitForCleanSnapshot_SnapAtTimestampWithInFlights) {
TEST_F(MvccTest, TestWaitForApplyingTransactionsToCommit) {
MvccManager mgr;
- Timestamp tx1 = clock_->Now();
+ Timestamp tx1 = clock_.Now();
mgr.StartTransaction(tx1);
- Timestamp tx2 = clock_->Now();
+ Timestamp tx2 = clock_.Now();
mgr.StartTransaction(tx2);
mgr.AdjustSafeTime(tx2);
@@ -572,7 +572,7 @@ TEST_F(MvccTest, TestWaitForApplyingTransactionsToCommit) {
// instead return an error.
TEST_F(MvccTest, TestDontWaitAfterClose) {
MvccManager mgr;
- Timestamp tx1 = clock_->Now();
+ Timestamp tx1 = clock_.Now();
mgr.StartTransaction(tx1);
mgr.AdjustSafeTime(tx1);
mgr.StartApplyingTransaction(tx1);
@@ -612,11 +612,11 @@ TEST_F(MvccTest, TestTxnAbort) {
MvccManager mgr;
// Transactions with timestamps 1 through 3
- Timestamp tx1 = clock_->Now();
+ Timestamp tx1 = clock_.Now();
mgr.StartTransaction(tx1);
- Timestamp tx2 = clock_->Now();
+ Timestamp tx2 = clock_.Now();
mgr.StartTransaction(tx2);
- Timestamp tx3 = clock_->Now();
+ Timestamp tx3 = clock_.Now();
mgr.StartTransaction(tx3);
mgr.AdjustSafeTime(tx3);
@@ -645,7 +645,7 @@ TEST_F(MvccTest, TestTxnAbort) {
TEST_F(MvccTest, TestAutomaticCleanTimeMoveToSafeTimeOnCommit) {
MvccManager mgr;
- clock_->Update(Timestamp(20));
+ clock_.Update(Timestamp(20));
mgr.StartTransaction(Timestamp(10));
mgr.StartTransaction(Timestamp(15));
@@ -683,7 +683,7 @@ TEST_F(MvccTest, TestIllegalStateTransitionsCrash) {
"Trying to commit a transaction with a future timestamp|"
"Trying to remove timestamp which isn't in the in-flight set: 1");
- clock_->Update(Timestamp(20));
+ clock_.Update(Timestamp(20));
EXPECT_DEATH({
mgr.CommitTransaction(Timestamp(1));
@@ -691,7 +691,7 @@ TEST_F(MvccTest, TestIllegalStateTransitionsCrash) {
// Start a transaction, and try committing it without having moved to
"Applying"
// state.
- Timestamp t = clock_->Now();
+ Timestamp t = clock_.Now();
mgr.StartTransaction(t);
EXPECT_DEATH({
mgr.CommitTransaction(t);
@@ -706,7 +706,7 @@ TEST_F(MvccTest, TestIllegalStateTransitionsCrash) {
}, "Trying to remove timestamp which isn't in the in-flight set: 21");
// Start a new transaction. This time, mark it as Applying.
- t = clock_->Now();
+ t = clock_.Now();
mgr.StartTransaction(t);
mgr.AdjustSafeTime(t);
mgr.StartApplyingTransaction(t);
@@ -729,7 +729,7 @@ TEST_F(MvccTest, TestWaitUntilCleanDeadline) {
MvccManager mgr;
// Transactions with timestamp 1
- Timestamp tx1 = clock_->Now();
+ Timestamp tx1 = clock_.Now();
mgr.StartTransaction(tx1);
// Wait until the 'tx1' timestamp is clean -- this won't happen because the
@@ -758,10 +758,12 @@ TEST_F(MvccTest, TestCorrectInitWithNoTxns) {
EXPECT_EQ(snap.committed_timestamps_.size(), 0);
// Read the clock a few times to advance the timestamp
- for (int i = 0; i < 10; i++) clock_->Now();
+ for (int i = 0; i < 10; i++) {
+ clock_.Now();
+ }
// Advance the safe timestamp.
- Timestamp new_safe_time = clock_->Now();
+ Timestamp new_safe_time = clock_.Now();
mgr.AdjustSafeTime(new_safe_time);
// Test that the snapshot reports that a timestamp lower than the safe time
diff --git a/src/kudu/tablet/mvcc.h b/src/kudu/tablet/mvcc.h
index 62ccf13..65336d4 100644
--- a/src/kudu/tablet/mvcc.h
+++ b/src/kudu/tablet/mvcc.h
@@ -297,7 +297,7 @@ class MvccManager {
FRIEND_TEST(MvccTest, TestTxnAbort);
FRIEND_TEST(MvccTest, TestAutomaticCleanTimeMoveToSafeTimeOnCommit);
FRIEND_TEST(MvccTest, TestWaitForApplyingTransactionsToCommit);
- FRIEND_TEST(MvccTest,
TestWaitForCleanSnapshot_SnapAfterSafeTimeWithInFlights);
+ FRIEND_TEST(MvccTest, WaitForCleanSnapshotSnapAfterSafeTimeWithInFlights);
FRIEND_TEST(MvccTest, TestDontWaitAfterClose);
enum TxnState {
diff --git a/src/kudu/tablet/tablet-harness.h b/src/kudu/tablet/tablet-harness.h
index 2a60cdd..c3197e9 100644
--- a/src/kudu/tablet/tablet-harness.h
+++ b/src/kudu/tablet/tablet-harness.h
@@ -34,6 +34,8 @@
#include "kudu/util/metrics.h"
#include "kudu/util/status.h"
+METRIC_DECLARE_entity(server);
+
namespace kudu {
namespace tablet {
@@ -67,13 +69,11 @@ class TabletHarness {
: env(Env::Default()),
tablet_id("test_tablet_id"),
root_dir(std::move(root_dir)),
- enable_metrics(true),
clock_type(LOGICAL_CLOCK) {}
Env* env;
std::string tablet_id;
std::string root_dir;
- bool enable_metrics;
ClockType clock_type;
};
@@ -103,21 +103,25 @@ class TabletHarness {
/*extra_config=*/ boost::none,
/*dimension_label=*/
boost::none,
&metadata));
- if (options_.enable_metrics) {
- metrics_registry_.reset(new MetricRegistry());
- }
-
- if (options_.clock_type == Options::LOGICAL_CLOCK) {
- clock_ =
clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp);
- } else {
- clock_.reset(new clock::HybridClock());
- RETURN_NOT_OK(clock_->Init());
+ metrics_registry_.reset(new MetricRegistry);
+ metric_entity_ = METRIC_ENTITY_server.Instantiate(metrics_registry_.get(),
+ "tablet-harness");
+
+ switch (options_.clock_type) {
+ case Options::HYBRID_CLOCK:
+ clock_.reset(new clock::HybridClock(metric_entity_));
+ break;
+ case Options::LOGICAL_CLOCK:
+ clock_.reset(new clock::LogicalClock(Timestamp::kInitialTimestamp,
+ metric_entity_));
+ break;
}
+ RETURN_NOT_OK(clock_->Init());
tablet_.reset(new Tablet(metadata,
clock_.get(),
- std::shared_ptr<MemTracker>(),
+ {},
metrics_registry_.get(),
- make_scoped_refptr(new
log::LogAnchorRegistry())));
+ make_scoped_refptr(new log::LogAnchorRegistry)));
return Status::OK();
}
@@ -149,7 +153,8 @@ class TabletHarness {
private:
Options options_;
- gscoped_ptr<MetricRegistry> metrics_registry_;
+ std::unique_ptr<MetricRegistry> metrics_registry_;
+ scoped_refptr<MetricEntity> metric_entity_;
std::unique_ptr<clock::Clock> clock_;
Schema schema_;
diff --git a/src/kudu/tablet/tablet-test-util.h
b/src/kudu/tablet/tablet-test-util.h
index 3198b3d..5e0bcc9 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -111,7 +111,6 @@ class KuduTabletTest : public KuduTest {
void CreateTestTablet(const std::string& root_dir = "") {
std::string dir = root_dir.empty() ? GetTestPath("fs_root") : root_dir;
TabletHarness::Options opts(dir);
- opts.enable_metrics = true;
opts.clock_type = clock_type_;
bool first_time = harness_ == nullptr;
harness_.reset(new TabletHarness(schema_, opts));
diff --git a/src/kudu/tools/tool_action_perf.cc
b/src/kudu/tools/tool_action_perf.cc
index c961c8a..a547a26 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -186,7 +186,6 @@
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h"
#include "kudu/client/write_op.h"
-#include "kudu/clock/clock.h"
#include "kudu/clock/logical_clock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/iterator.h"
@@ -241,7 +240,6 @@ using kudu::client::KuduSchemaBuilder;
using kudu::client::KuduSession;
using kudu::client::KuduTable;
using kudu::client::KuduTableCreator;
-using kudu::clock::Clock;
using kudu::clock::LogicalClock;
using kudu::consensus::ConsensusBootstrapInfo;
using kudu::consensus::ConsensusMetadata;
@@ -814,8 +812,8 @@ Status TabletScan(const RunnerContext& context) {
scoped_refptr<ConsensusMetadata> cmeta;
RETURN_NOT_OK(cmeta_manager->Load(tablet_id, &cmeta));
- unique_ptr<Clock>
clock(LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp));
- RETURN_NOT_OK(clock->Init());
+ LogicalClock clock(Timestamp::kInitialTimestamp);
+ RETURN_NOT_OK(clock.Init());
scoped_refptr<LogAnchorRegistry> registry(new LogAnchorRegistry());
@@ -825,7 +823,7 @@ Status TabletScan(const RunnerContext& context) {
ConsensusBootstrapInfo cbi;
RETURN_NOT_OK(tablet::BootstrapTablet(std::move(tmeta),
cmeta->CommittedConfig(),
- clock.get(),
+ &clock,
/*mem_tracker=*/ nullptr,
/*result_tracker=*/ nullptr,
/*metric_registry=*/ nullptr,