This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new ad7f591 time_manager: remove shared ownership
ad7f591 is described below
commit ad7f5914a943e60b829508468666e6f5d9c64b3d
Author: Adar Dembo <[email protected]>
AuthorDate: Fri Jan 10 11:21:08 2020 -0800
time_manager: remove shared ownership
This object also didn't need shared ownership; it can be exclusively owned
by RaftConsensus and hand out raw pointers to all subordinate objects.
Change-Id: I871debdeaf5c8b92168e764b4a6142319f229438
Reviewed-on: http://gerrit.cloudera.org:8080/15007
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Adar Dembo <[email protected]>
---
src/kudu/consensus/consensus_peers-test.cc | 5 +++--
src/kudu/consensus/consensus_queue-test.cc | 5 +++--
src/kudu/consensus/consensus_queue.cc | 4 ++--
src/kudu/consensus/consensus_queue.h | 4 ++--
src/kudu/consensus/pending_rounds.cc | 4 ++--
src/kudu/consensus/pending_rounds.h | 6 +++---
src/kudu/consensus/raft_consensus.cc | 8 +++++---
src/kudu/consensus/raft_consensus.h | 8 ++++----
src/kudu/consensus/raft_consensus_quorum-test.cc | 4 ++--
src/kudu/consensus/time_manager-test.cc | 5 ++---
src/kudu/consensus/time_manager.h | 3 +--
src/kudu/tablet/tablet_replica.cc | 3 ++-
src/kudu/tablet/tablet_replica.h | 4 ++--
src/kudu/tserver/tablet_service.cc | 9 ++++-----
14 files changed, 37 insertions(+), 35 deletions(-)
diff --git a/src/kudu/consensus/consensus_peers-test.cc
b/src/kudu/consensus/consensus_peers-test.cc
index 4a380b8..a05c921 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -94,12 +94,12 @@ class ConsensusPeersTest : public KuduTest {
clock_.reset(new clock::HybridClock());
ASSERT_OK(clock_->Init());
- scoped_refptr<TimeManager> time_manager(new TimeManager(clock_.get(),
Timestamp::kMin));
+ time_manager_.reset(new TimeManager(clock_.get(), Timestamp::kMin));
message_queue_.reset(new PeerMessageQueue(
metric_entity_,
log_.get(),
- time_manager,
+ time_manager_.get(),
FakeRaftPeerPB(kLeaderUuid),
kTabletId,
raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
@@ -166,6 +166,7 @@ class ConsensusPeersTest : public KuduTest {
unique_ptr<FsManager> fs_manager_;
scoped_refptr<Log> log_;
unique_ptr<ThreadPool> raft_pool_;
+ unique_ptr<TimeManager> time_manager_;
unique_ptr<PeerMessageQueue> message_queue_;
const Schema schema_;
LogOptions options_;
diff --git a/src/kudu/consensus/consensus_queue-test.cc
b/src/kudu/consensus/consensus_queue-test.cc
index f6e6c1a..b47c52e 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -97,15 +97,15 @@ class ConsensusQueueTest : public KuduTest {
ASSERT_OK(clock_->Init());
ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
+ time_manager_.reset(new TimeManager(clock_.get(), Timestamp::kMin));
CloseAndReopenQueue(MinimumOpId(), MinimumOpId());
}
void CloseAndReopenQueue(const OpId& replicated_opid, const OpId&
committed_opid) {
- scoped_refptr<TimeManager> time_manager(new TimeManager(clock_.get(),
Timestamp::kMin));
queue_.reset(new PeerMessageQueue(
metric_entity_,
log_.get(),
- time_manager,
+ time_manager_.get(),
FakeRaftPeerPB(kLeaderUuid),
kTestTablet,
raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
@@ -229,6 +229,7 @@ class ConsensusQueueTest : public KuduTest {
scoped_refptr<MetricEntity> metric_entity_;
scoped_refptr<log::Log> log_;
unique_ptr<ThreadPool> raft_pool_;
+ unique_ptr<TimeManager> time_manager_;
gscoped_ptr<PeerMessageQueue> queue_;
scoped_refptr<log::LogAnchorRegistry> registry_;
unique_ptr<clock::Clock> clock_;
diff --git a/src/kudu/consensus/consensus_queue.cc
b/src/kudu/consensus/consensus_queue.cc
index a051f61..c655d5f 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -152,7 +152,7 @@ PeerMessageQueue::Metrics::Metrics(const
scoped_refptr<MetricEntity>& metric_ent
PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>&
metric_entity,
scoped_refptr<log::Log> log,
- scoped_refptr<TimeManager> time_manager,
+ TimeManager* time_manager,
RaftPeerPB local_peer_pb,
string tablet_id,
unique_ptr<ThreadPoolToken>
raft_pool_observers_token,
@@ -164,7 +164,7 @@ PeerMessageQueue::PeerMessageQueue(const
scoped_refptr<MetricEntity>& metric_ent
successor_watch_in_progress_(false),
log_cache_(metric_entity, std::move(log),
local_peer_pb_.permanent_uuid(), tablet_id_),
metrics_(metric_entity),
- time_manager_(std::move(time_manager)) {
+ time_manager_(time_manager) {
DCHECK(local_peer_pb_.has_permanent_uuid());
DCHECK(local_peer_pb_.has_last_known_addr());
DCHECK(last_locally_replicated.IsInitialized());
diff --git a/src/kudu/consensus/consensus_queue.h
b/src/kudu/consensus/consensus_queue.h
index 2b9e546..d2257bc 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -182,7 +182,7 @@ class PeerMessageQueue {
PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity,
scoped_refptr<log::Log> log,
- scoped_refptr<TimeManager> time_manager,
+ TimeManager* time_manager,
RaftPeerPB local_peer_pb,
std::string tablet_id,
std::unique_ptr<ThreadPoolToken> raft_pool_observers_token,
@@ -569,7 +569,7 @@ class PeerMessageQueue {
Metrics metrics_;
- scoped_refptr<TimeManager> time_manager_;
+ TimeManager* time_manager_;
};
// The interface between RaftConsensus and the PeerMessageQueue.
diff --git a/src/kudu/consensus/pending_rounds.cc
b/src/kudu/consensus/pending_rounds.cc
index 940eb05..863f0be 100644
--- a/src/kudu/consensus/pending_rounds.cc
+++ b/src/kudu/consensus/pending_rounds.cc
@@ -47,10 +47,10 @@ namespace consensus {
// PendingRounds
//------------------------------------------------------------
-PendingRounds::PendingRounds(string log_prefix, scoped_refptr<TimeManager>
time_manager)
+PendingRounds::PendingRounds(string log_prefix, TimeManager* time_manager)
: log_prefix_(std::move(log_prefix)),
last_committed_op_id_(MinimumOpId()),
- time_manager_(std::move(time_manager)) {}
+ time_manager_(time_manager) {}
PendingRounds::~PendingRounds() {
}
diff --git a/src/kudu/consensus/pending_rounds.h
b/src/kudu/consensus/pending_rounds.h
index c1043b6..5bc310a 100644
--- a/src/kudu/consensus/pending_rounds.h
+++ b/src/kudu/consensus/pending_rounds.h
@@ -22,6 +22,7 @@
#include <string>
#include "kudu/consensus/opid.pb.h"
+#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
@@ -29,7 +30,6 @@ namespace kudu {
class Status;
namespace consensus {
-class ConsensusRound;
class TimeManager;
// Tracks the pending consensus rounds being managed by a Raft replica (either
leader
@@ -41,7 +41,7 @@ class TimeManager;
// We should consolidate to "round".
class PendingRounds {
public:
- PendingRounds(std::string log_prefix, scoped_refptr<TimeManager>
time_manager);
+ PendingRounds(std::string log_prefix, TimeManager* time_manager);
~PendingRounds();
// Set the committed op during startup. This should be done after
@@ -109,7 +109,7 @@ class PendingRounds {
// The OpId of the round that was last committed. Initialized to
MinimumOpId().
OpId last_committed_op_id_;
- scoped_refptr<TimeManager> time_manager_;
+ TimeManager* time_manager_;
DISALLOW_COPY_AND_ASSIGN(PendingRounds);
};
diff --git a/src/kudu/consensus/raft_consensus.cc
b/src/kudu/consensus/raft_consensus.cc
index 8f83ed8..618e542 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -46,6 +46,7 @@
#include "kudu/consensus/peer_manager.h"
#include "kudu/consensus/pending_rounds.h"
#include "kudu/consensus/quorum_util.h"
+#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/macros.h"
@@ -225,7 +226,7 @@ Status RaftConsensus::Create(ConsensusOptions options,
Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
unique_ptr<PeerProxyFactory> peer_proxy_factory,
scoped_refptr<log::Log> log,
- scoped_refptr<TimeManager> time_manager,
+ unique_ptr<TimeManager> time_manager,
ConsensusRoundHandler* round_handler,
const scoped_refptr<MetricEntity>& metric_entity,
Callback<void(const string& reason)>
mark_dirty_clbk) {
@@ -272,7 +273,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo&
info,
unique_ptr<PeerMessageQueue> queue(new PeerMessageQueue(
metric_entity,
log_,
- time_manager_,
+ time_manager_.get(),
local_peer_pb_,
options_.tablet_id,
raft_pool->NewToken(ThreadPool::ExecutionMode::SERIAL),
@@ -288,7 +289,8 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo&
info,
raft_pool_token_.get(),
log_));
- unique_ptr<PendingRounds> pending(new PendingRounds(LogPrefixThreadSafe(),
time_manager_));
+ unique_ptr<PendingRounds> pending(new PendingRounds(
+ LogPrefixThreadSafe(), time_manager_.get()));
// Capture a weak_ptr reference into the functor so it can safely handle
// outliving the consensus instance.
diff --git a/src/kudu/consensus/raft_consensus.h
b/src/kudu/consensus/raft_consensus.h
index b3a8980..2b3d704 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -37,7 +37,6 @@
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/ref_counted_replicate.h"
-#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/callback.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/macros.h"
@@ -76,6 +75,7 @@ class ConsensusRoundHandler;
class PeerManager;
class PeerProxyFactory;
class PendingRounds;
+class TimeManager;
struct ConsensusBootstrapInfo;
struct ElectionResult;
@@ -161,7 +161,7 @@ class RaftConsensus : public
std::enable_shared_from_this<RaftConsensus>,
Status Start(const ConsensusBootstrapInfo& info,
std::unique_ptr<PeerProxyFactory> peer_proxy_factory,
scoped_refptr<log::Log> log,
- scoped_refptr<TimeManager> time_manager,
+ std::unique_ptr<TimeManager> time_manager,
ConsensusRoundHandler* round_handler,
const scoped_refptr<MetricEntity>& metric_entity,
Callback<void(const std::string& reason)> mark_dirty_clbk);
@@ -332,7 +332,7 @@ class RaftConsensus : public
std::enable_shared_from_this<RaftConsensus>,
// Thread-safe.
const std::string& tablet_id() const;
- scoped_refptr<TimeManager> time_manager() const { return time_manager_; }
+ TimeManager* time_manager() const { return time_manager_.get(); }
// Returns a copy of the state of the consensus system.
// If 'report_health' is set to 'INCLUDE_HEALTH_REPORT', and if the
@@ -864,7 +864,7 @@ class RaftConsensus : public
std::enable_shared_from_this<RaftConsensus>,
std::unique_ptr<ThreadPoolToken> raft_pool_token_;
scoped_refptr<log::Log> log_;
- scoped_refptr<TimeManager> time_manager_;
+ std::unique_ptr<TimeManager> time_manager_;
std::unique_ptr<PeerProxyFactory> peer_proxy_factory_;
// When we receive a message from a remote peer telling us to start a
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc
b/src/kudu/consensus/raft_consensus_quorum-test.cc
index d2676f8..4615ff5 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -215,7 +215,7 @@ class RaftConsensusQuorumTest : public KuduTest {
unique_ptr<PeerProxyFactory> proxy_factory(
new LocalTestPeerProxyFactory(peers_.get()));
- scoped_refptr<TimeManager> time_manager(
+ unique_ptr<TimeManager> time_manager(
new TimeManager(clock_.get(), Timestamp::kMin));
unique_ptr<TestTransactionFactory> txn_factory(
new TestTransactionFactory(logs_[i].get()));
@@ -226,7 +226,7 @@ class RaftConsensusQuorumTest : public KuduTest {
boot_info,
std::move(proxy_factory),
logs_[i],
- time_manager,
+ std::move(time_manager),
txn_factories_.back().get(),
metric_entity_,
Bind(&DoNothing)));
diff --git a/src/kudu/consensus/time_manager-test.cc
b/src/kudu/consensus/time_manager-test.cc
index f5f27e9..c1f37d1 100644
--- a/src/kudu/consensus/time_manager-test.cc
+++ b/src/kudu/consensus/time_manager-test.cc
@@ -27,7 +27,6 @@
#include "kudu/clock/hybrid_clock.h"
#include "kudu/common/timestamp.h"
#include "kudu/consensus/consensus.pb.h"
-#include "kudu/gutil/ref_counted.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
@@ -43,7 +42,7 @@ namespace consensus {
class TimeManagerTest : public KuduTest {
public:
- TimeManagerTest() : clock_(new clock::HybridClock()) {}
+ TimeManagerTest() : clock_(new clock::HybridClock) {}
void SetUp() override {
CHECK_OK(clock_->Init());
@@ -74,7 +73,7 @@ class TimeManagerTest : public KuduTest {
}
unique_ptr<clock::HybridClock> clock_;
- scoped_refptr<TimeManager> time_manager_;
+ unique_ptr<TimeManager> time_manager_;
vector<unique_ptr<CountDownLatch>> latches_;
vector<thread> threads_;
};
diff --git a/src/kudu/consensus/time_manager.h
b/src/kudu/consensus/time_manager.h
index 45304c1..46fa63c 100644
--- a/src/kudu/consensus/time_manager.h
+++ b/src/kudu/consensus/time_manager.h
@@ -23,7 +23,6 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/timestamp.h"
-#include "kudu/gutil/ref_counted.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
@@ -71,7 +70,7 @@ class ReplicateMsg;
// This anomaly can cause non-repeatable reads in certain conditions.
//
// This class is thread safe.
-class TimeManager : public RefCountedThreadSafe<TimeManager> {
+class TimeManager {
public:
// Constructs a TimeManager in non-leader mode.
diff --git a/src/kudu/tablet/tablet_replica.cc
b/src/kudu/tablet/tablet_replica.cc
index b990ef8..bf4ca61 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -37,6 +37,7 @@
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/raft_consensus.h"
+#include "kudu/consensus/time_manager.h"
#include "kudu/fs/data_dirs.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/bind.h"
@@ -183,7 +184,7 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo&
bootstrap_info,
scoped_refptr<MetricEntity> metric_entity;
unique_ptr<PeerProxyFactory> peer_proxy_factory;
- scoped_refptr<TimeManager> time_manager;
+ unique_ptr<TimeManager> time_manager;
{
std::lock_guard<simple_spinlock> l(lock_);
CHECK_EQ(BOOTSTRAPPING, state_);
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 915fad7..abc9079 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -29,7 +29,6 @@
#include "kudu/consensus/log.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/raft_consensus.h"
-#include "kudu/consensus/time_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/callback.h"
#include "kudu/gutil/gscoped_ptr.h"
@@ -58,6 +57,7 @@ class Callback;
namespace consensus {
class ConsensusMetadataManager;
+class TimeManager;
class TransactionStatusPB;
}
@@ -176,7 +176,7 @@ class TabletReplica : public
RefCountedThreadSafe<TabletReplica>,
return tablet_.get();
}
- scoped_refptr<consensus::TimeManager> time_manager() const {
+ consensus::TimeManager* time_manager() const {
return consensus_->time_manager();
}
diff --git a/src/kudu/tserver/tablet_service.cc
b/src/kudu/tserver/tablet_service.cc
index 93c6d9c..f642a7e 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -2435,10 +2435,9 @@ Status
TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
}
case READ_YOUR_WRITES: // Fallthrough intended
case READ_AT_SNAPSHOT: {
- scoped_refptr<consensus::TimeManager> time_manager =
replica->time_manager();
- s = HandleScanAtSnapshot(scan_pb, rpc_context, projection,
tablet.get(),
- time_manager.get(), &iter,
&snap_start_timestamp, snap_timestamp,
- error_code);
+ s = HandleScanAtSnapshot(
+ scan_pb, rpc_context, projection, tablet.get(),
replica->time_manager(),
+ &iter, &snap_start_timestamp, snap_timestamp, error_code);
break;
}
}
@@ -2734,7 +2733,7 @@ Status TabletServiceImpl::HandleScanAtSnapshot(const
NewScanRequestPB& scan_pb,
const RpcContext* rpc_context,
const Schema& projection,
Tablet* tablet,
- consensus::TimeManager*
time_manager,
+ TimeManager* time_manager,
unique_ptr<RowwiseIterator>*
iter,
boost::optional<Timestamp>*
snap_start_timestamp,
Timestamp* snap_timestamp,