This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit f334e6e083203c96e56957e3d331adbc33d3c8a4 Author: Andrew Wong <[email protected]> AuthorDate: Tue Nov 2 17:14:04 2021 -0700 [txns] fix UB in TxnSystemClient when adding max timeout to now This patch addresses the following UB found in a pre-commit: /home/jenkins-slave/workspace/kudu-master/1/src/kudu/util/monotime.cc:220:10: runtime error: signed integer overflow: 271833850110 + 9223372036854775807 cannot be represented in type 'long' #0 0x7f225fca9b31 in kudu::MonoTime::AddDelta(kudu::MonoDelta const&) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/util/monotime.cc:220:10 #1 0x7f225fcaaafe in kudu::operator+(kudu::MonoTime const&, kudu::MonoDelta const&) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/util/monotime.cc:335:7 #2 0x7f226fa3d6ff in kudu::transactions::TxnSystemClient::CoordinateTransactionAsync(kudu::tserver::CoordinatorOpPB, kudu::MonoDelta, std::function<void (kudu::Status const&)> const&, kudu::tserver::CoordinatorOpResultPB*) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/transactions/txn_system_client.cc:331:45 #3 0x7f226fa3feca in kudu::transactions::TxnSystemClient::KeepTransactionAlive(long, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, kudu::MonoDelta) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/transactions/txn_system_client.cc:320:3 #4 0x7f2271211629 in kudu::transactions::TxnManager::KeepTransactionAlive(long, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, kudu::MonoTime const&) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/master/txn_manager.cc:238:27 #5 0x7f227121535f in kudu::transactions::TxnManagerServiceImpl::KeepTransactionAlive(kudu::transactions::KeepTransactionAliveRequestPB const*, kudu::transactions::KeepTransactionAliveResponsePB*, kudu::rpc::RpcContext*) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/master/txn_manager_service.cc:159:42 #6 0x7f2265d5749e in std::function<void (google::protobuf::Message const*, google::protobuf::Message*, kudu::rpc::RpcContext*)>::operator()(google::protobuf::Message const*, google::protobuf::Message*, kudu::rpc::RpcContext*) const ../../../include/c++/7.5.0/bits/std_function.h:706:14 #7 0x7f2265d5648c in kudu::rpc::GeneratedServiceIf::Handle(kudu::rpc::InboundCall*) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/rpc/service_if.cc:137:3 #8 0x7f2265d5c42d in kudu::rpc::ServicePool::RunThread() /home/jenkins-slave/workspace/kudu-master/1/src/kudu/rpc/service_pool.cc:232:15 #9 0x7f225fd596ba in kudu::Thread::SuperviseThread(void*) /home/jenkins-slave/workspace/kudu-master/1/src/kudu/util/thread.cc:674:3 #10 0x7f22625026da in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x76da) #11 0x7f225bfea71e in clone (/lib/x86_64-linux-gnu/libc.so.6+0x12171e) SUMMARY: UndefinedBehaviorSanitizer: undefined-behavior /home/jenkins-slave/workspace/kudu-master/1/src/kudu/util/monotime.cc:220:10 in Previously, we converted an initial deadline to a timeout, potentially rejiggering the value in case of the maximal timeout, and then recomputed the deadline. This patch addresses the UB by addressing a TODO to pass deadlines in the context of the TxnSystemClient instead of timeouts. Change-Id: I1e5d4d06e8c0801c7f6b2399f7622e6f039f988e Reviewed-on: http://gerrit.cloudera.org:8080/17993 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> Reviewed-by: Abhishek Chennaka <[email protected]> --- src/kudu/integration-tests/txn_commit-itest.cc | 2 +- .../integration-tests/txn_participant-itest.cc | 99 +++++++++++++--------- .../integration-tests/txn_status_table-itest.cc | 12 +-- src/kudu/master/master.cc | 6 +- src/kudu/master/master.h | 3 +- src/kudu/master/txn_manager-test.cc | 2 +- src/kudu/master/txn_manager.cc | 24 ++---- src/kudu/tools/tool_action_txn.cc | 3 +- src/kudu/transactions/txn_status_manager.cc | 6 +- src/kudu/transactions/txn_system_client.cc | 58 ++++++------- src/kudu/transactions/txn_system_client.h | 22 ++--- src/kudu/tserver/ts_tablet_manager.cc | 4 +- 12 files changed, 121 insertions(+), 120 deletions(-) diff --git a/src/kudu/integration-tests/txn_commit-itest.cc b/src/kudu/integration-tests/txn_commit-itest.cc index b833959..9b52867 100644 --- a/src/kudu/integration-tests/txn_commit-itest.cc +++ b/src/kudu/integration-tests/txn_commit-itest.cc @@ -676,7 +676,7 @@ TEST_F(TxnCommitITest, TestCommitAfterParticipantAbort) { op_pb.set_txn_id(0); op_pb.set_type(ParticipantOpPB::ABORT_TXN); ASSERT_OK(txn_client_->ParticipateInTransaction( - participant_ids_[0], op_pb, MonoDelta::FromSeconds(3))); + participant_ids_[0], op_pb, MonoTime::Now() + MonoDelta::FromSeconds(3))); // When we try to commit, we should end up not completing. ASSERT_OK(txn->StartCommit()); diff --git a/src/kudu/integration-tests/txn_participant-itest.cc b/src/kudu/integration-tests/txn_participant-itest.cc index 51dbd24..7a3c7b1 100644 --- a/src/kudu/integration-tests/txn_participant-itest.cc +++ b/src/kudu/integration-tests/txn_participant-itest.cc @@ -847,32 +847,39 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientGetMetadata) { // Get commit-related metadata. TxnMetadataPB meta_pb; Status s = txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), kDefaultTimeout, + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), + MonoTime::Now() + kDefaultTimeout, /*begin_commit_timestamp*/nullptr, &meta_pb); ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString(); ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout)); + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), + MonoTime::Now() + kDefaultTimeout)); ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), kDefaultTimeout, + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), + MonoTime::Now() + kDefaultTimeout, /*begin_commit_timestamp*/nullptr, &meta_pb)); ASSERT_FALSE(meta_pb.has_aborted()); ASSERT_FALSE(meta_pb.has_commit_mvcc_op_timestamp()); ASSERT_FALSE(meta_pb.has_commit_timestamp()); ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_COMMIT), kDefaultTimeout)); + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_COMMIT), + MonoTime::Now() + kDefaultTimeout)); ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), kDefaultTimeout, + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), + MonoTime::Now() + kDefaultTimeout, /*begin_commit_timestamp*/nullptr, &meta_pb)); ASSERT_FALSE(meta_pb.has_aborted()); ASSERT_TRUE(meta_pb.has_commit_mvcc_op_timestamp()); ASSERT_FALSE(meta_pb.has_commit_timestamp()); ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT), kDefaultTimeout)); + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT), + MonoTime::Now() + kDefaultTimeout)); ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), kDefaultTimeout, + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::GET_METADATA), + MonoTime::Now() + kDefaultTimeout, /*begin_commit_timestamp*/nullptr, &meta_pb)); ASSERT_FALSE(meta_pb.has_aborted()); ASSERT_TRUE(meta_pb.has_commit_mvcc_op_timestamp()); @@ -881,11 +888,14 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientGetMetadata) { // Get abort-related metadata. constexpr const auto kAbortedTxnId = 1; ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout)); + tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::BEGIN_TXN), + MonoTime::Now() + kDefaultTimeout)); ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::ABORT_TXN), kDefaultTimeout)); + tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::ABORT_TXN), + MonoTime::Now() + kDefaultTimeout)); ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::GET_METADATA), kDefaultTimeout, + tablet_id, MakeParticipantOp(kAbortedTxnId, ParticipantOpPB::GET_METADATA), + MonoTime::Now() + kDefaultTimeout, /*begin_commit_timestamp*/nullptr, &meta_pb)); ASSERT_TRUE(meta_pb.has_aborted()); ASSERT_FALSE(meta_pb.has_commit_mvcc_op_timestamp()); @@ -909,13 +919,15 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientBeginTxnDoesntLock) { cluster_->messenger()->sasl_proto_name(), &txn_client)); ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kFirstTxn, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout)); + tablet_id, MakeParticipantOp(kFirstTxn, ParticipantOpPB::BEGIN_TXN), + MonoTime::Now() + kDefaultTimeout)); NO_FATALS(CheckReplicasMatchTxns(replicas, { { kFirstTxn, kOpen, -1 } })); // Begin another transaction with a lower txn ID. This is allowed, since // partition locks are only taken once we write. ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kSecondTxn, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout)); + tablet_id, MakeParticipantOp(kSecondTxn, ParticipantOpPB::BEGIN_TXN), + MonoTime::Now() + kDefaultTimeout)); NO_FATALS(CheckReplicasMatchTxns(replicas, { { kFirstTxn, kOpen, -1 }, { kSecondTxn, kOpen, -1 } })); } @@ -935,17 +947,20 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientCommitSequence) { cluster_->messenger()->sasl_proto_name(), &txn_client)); ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout)); + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), + MonoTime::Now() + kDefaultTimeout)); NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kOpen, -1 } })); // Try some illegal ops and ensure we get an error. Status s = txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT), kDefaultTimeout); + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT), + MonoTime::Now() + kDefaultTimeout); ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kOpen, -1 } })); ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout)); + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), + MonoTime::Now() + kDefaultTimeout)); NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kOpen, -1 } })); // Progress the transaction forward, and perform similar checks that we get @@ -953,12 +968,13 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientCommitSequence) { Timestamp begin_commit_ts; ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_COMMIT), - kDefaultTimeout, &begin_commit_ts)); + MonoTime::Now() + kDefaultTimeout, &begin_commit_ts)); ASSERT_NE(Timestamp::kInvalidTimestamp, begin_commit_ts); NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kCommitInProgress, -1 } })); s = txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), kDefaultTimeout); + tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), + MonoTime::Now() + kDefaultTimeout); ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kCommitInProgress, -1 } })); @@ -967,25 +983,25 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientCommitSequence) { Timestamp refetched_begin_commit_ts; ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_COMMIT), - kDefaultTimeout, &refetched_begin_commit_ts)); + MonoTime::Now() + kDefaultTimeout, &refetched_begin_commit_ts)); ASSERT_EQ(refetched_begin_commit_ts, begin_commit_ts); NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnId, kCommitInProgress, -1 } })); // Once we finish committing, we should be unable to begin or abort. ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::FINALIZE_COMMIT, kDummyCommitTimestamp), - kDefaultTimeout)); + MonoTime::Now() + kDefaultTimeout)); NO_FATALS(CheckReplicasMatchTxns(replicas, {{kTxnId, kCommitted, kDummyCommitTimestamp}})); for (const auto type : { ParticipantOpPB::BEGIN_TXN, ParticipantOpPB::ABORT_TXN }) { Status s = txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kTxnId, type), kDefaultTimeout); + tablet_id, MakeParticipantOp(kTxnId, type), MonoTime::Now() + kDefaultTimeout); ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); } NO_FATALS(CheckReplicasMatchTxns(replicas, {{kTxnId, kCommitted, kDummyCommitTimestamp}})); for (const auto type : { ParticipantOpPB::BEGIN_COMMIT, ParticipantOpPB::FINALIZE_COMMIT }) { ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnId, type, kDummyCommitTimestamp), - kDefaultTimeout)); + MonoTime::Now() + kDefaultTimeout)); } NO_FATALS(CheckReplicasMatchTxns(replicas, {{kTxnId, kCommitted, kDummyCommitTimestamp}})); } @@ -1007,34 +1023,34 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientAbortSequence) { &txn_client)); ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnOne, ParticipantOpPB::BEGIN_TXN), - kDefaultTimeout)); + MonoTime::Now() + kDefaultTimeout)); ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnTwo, ParticipantOpPB::BEGIN_TXN), - kDefaultTimeout)); + MonoTime::Now() + kDefaultTimeout)); NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnOne, kOpen, -1 }, { kTxnTwo, kOpen, -1 } })); // Once we abort, we should be unable to do anything further. ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnOne, ParticipantOpPB::ABORT_TXN), - kDefaultTimeout)); + MonoTime::Now() + kDefaultTimeout)); ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnTwo, ParticipantOpPB::BEGIN_COMMIT), - kDefaultTimeout)); + MonoTime::Now() + kDefaultTimeout)); NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnOne, kAborted, -1 }, { kTxnTwo, kCommitInProgress, -1 } })); ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnTwo, ParticipantOpPB::ABORT_TXN), - kDefaultTimeout)); + MonoTime::Now() + kDefaultTimeout)); NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnOne, kAborted, -1 }, { kTxnTwo, kAborted, -1 } })); for (const auto type : { ParticipantOpPB::BEGIN_TXN, ParticipantOpPB::BEGIN_COMMIT, ParticipantOpPB::FINALIZE_COMMIT }) { Status s = txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kTxnOne, type), kDefaultTimeout); + tablet_id, MakeParticipantOp(kTxnOne, type), MonoTime::Now() + kDefaultTimeout); ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); s = txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kTxnTwo, type), kDefaultTimeout); + tablet_id, MakeParticipantOp(kTxnTwo, type), MonoTime::Now() + kDefaultTimeout); ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); } NO_FATALS(CheckReplicasMatchTxns(replicas, @@ -1042,10 +1058,10 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientAbortSequence) { // Repeated abort calls are idempotent. ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnOne, ParticipantOpPB::ABORT_TXN), - kDefaultTimeout)); + MonoTime::Now() + kDefaultTimeout)); ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnTwo, ParticipantOpPB::ABORT_TXN), - kDefaultTimeout)); + MonoTime::Now() + kDefaultTimeout)); NO_FATALS(CheckReplicasMatchTxns(replicas, { { kTxnOne, kAborted, -1 }, { kTxnTwo, kAborted, -1 } })); } @@ -1065,13 +1081,14 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientErrorWhenNotBegun) { for (auto type : { ParticipantOpPB::BEGIN_COMMIT, ParticipantOpPB::FINALIZE_COMMIT }) { Status s = txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(txn_id++, type), kDefaultTimeout); + tablet_id, MakeParticipantOp(txn_id++, type), MonoTime::Now() + kDefaultTimeout); ASSERT_TRUE(s.IsIllegalState()) << s.ToString(); NO_FATALS(CheckReplicasMatchTxns(replicas, {})); } ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(txn_id++, ParticipantOpPB::ABORT_TXN), kDefaultTimeout)); + tablet_id, MakeParticipantOp(txn_id++, ParticipantOpPB::ABORT_TXN), + MonoTime::Now() + kDefaultTimeout)); NO_FATALS(CheckReplicasMatchTxns(replicas, { { 2, kAborted, -1 } })); } @@ -1092,18 +1109,18 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientRepeatCalls) { for (const auto& type : kCommitSequence) { ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnOne, type, kDummyCommitTimestamp), - kDefaultTimeout)); + MonoTime::Now() + kDefaultTimeout)); ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnOne, type, kDummyCommitTimestamp), - kDefaultTimeout)); + MonoTime::Now() + kDefaultTimeout)); } for (const auto& type : kAbortSequence) { ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnTwo, type, kDummyCommitTimestamp), - kDefaultTimeout)); + MonoTime::Now() + kDefaultTimeout)); ASSERT_OK(txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnTwo, type, kDummyCommitTimestamp), - kDefaultTimeout)); + MonoTime::Now() + kDefaultTimeout)); } NO_FATALS(CheckReplicasMatchTxns( replicas, { { kTxnOne, kCommitted, kDummyCommitTimestamp }, { kTxnTwo, kAborted, -1 } })); @@ -1127,7 +1144,7 @@ TEST_F(TxnParticipantITest, TestTxnSystemClientTimeoutWhenNoMajority) { &txn_client)); Status s = txn_client->ParticipateInTransaction( tablet_id, MakeParticipantOp(kTxnId, ParticipantOpPB::BEGIN_TXN), - MonoDelta::FromSeconds(1)); + MonoTime::Now() + MonoDelta::FromSeconds(1)); ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); // We should have an initializing transaction until a majority is achieved, @@ -1162,7 +1179,7 @@ Status SendParticipantOps(TxnSystemClient* txn_client, const string& tablet_id, int txn_id = (*next_txn_id)++; for (const auto& op : kCommitSequence) { RETURN_NOT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(txn_id, op), kLongTimeout)); + tablet_id, MakeParticipantOp(txn_id, op), MonoTime::Now() + kLongTimeout)); } } return Status::OK(); @@ -1457,17 +1474,17 @@ TEST_F(TxnParticipantElectionStormITest, TestTxnSystemClientRetriesThroughStorm) // Start injecting latency to Raft-related traffic to spur elections. FLAGS_raft_enable_pre_election = false; - FLAGS_consensus_inject_latency_ms_in_notifications = 1.5 * FLAGS_raft_heartbeat_interval_ms;; + FLAGS_consensus_inject_latency_ms_in_notifications = 1.5 * FLAGS_raft_heartbeat_interval_ms; SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 2)); constexpr const int64_t kCommittedTxnId = 0; constexpr const int64_t kAbortedTxnId = 1; for (const auto& op : kCommitSequence) { ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kCommittedTxnId, op), kTimeout)); + tablet_id, MakeParticipantOp(kCommittedTxnId, op), MonoTime::Now() + kTimeout)); } for (const auto& op : kAbortSequence) { ASSERT_OK(txn_client->ParticipateInTransaction( - tablet_id, MakeParticipantOp(kAbortedTxnId, op), kTimeout)); + tablet_id, MakeParticipantOp(kAbortedTxnId, op), MonoTime::Now() + kTimeout)); } const vector<TxnParticipant::TxnEntry> expected_txns = { { kCommittedTxnId, kCommitted, kDummyCommitTimestamp }, diff --git a/src/kudu/integration-tests/txn_status_table-itest.cc b/src/kudu/integration-tests/txn_status_table-itest.cc index e18bc28..d9a03e2 100644 --- a/src/kudu/integration-tests/txn_status_table-itest.cc +++ b/src/kudu/integration-tests/txn_status_table-itest.cc @@ -482,7 +482,7 @@ TEST_F(TxnStatusTableITest, TestSystemClientMasterDown) { int64_t highest_seen_txn_id = -1; auto s = txn_sys_client_->BeginTransaction( 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id, - MonoDelta::FromMilliseconds(100)); + MonoTime::Now() + MonoDelta::FromMilliseconds(100)); ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); // The 'highest_seen_txn_id' should be left untouched. ASSERT_EQ(-1, highest_seen_txn_id); @@ -503,7 +503,7 @@ TEST_F(TxnStatusTableITest, TestSystemClientMasterDown) { int64_t highest_seen_txn_id = -1; ASSERT_OK(txn_sys_client_->BeginTransaction( 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id, - MonoDelta::FromSeconds(10))); + MonoTime::Now() + MonoDelta::FromSeconds(10))); // Make sure the highest txn ID we've seen matches the one we just started. ASSERT_EQ(1, highest_seen_txn_id); } @@ -520,7 +520,7 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) { int64_t highest_seen_txn_id = -1; auto s = txn_sys_client_->BeginTransaction( 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id, - MonoDelta::FromMilliseconds(100)); + MonoTime::Now() + MonoDelta::FromMilliseconds(100)); ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); // The 'highest_seen_txn_id' should be left untouched. ASSERT_EQ(-1, highest_seen_txn_id); @@ -541,7 +541,7 @@ TEST_F(TxnStatusTableITest, TestSystemClientTServerDown) { int64_t highest_seen_txn_id = -1; ASSERT_OK(txn_sys_client_->BeginTransaction( 1, kUser, nullptr /* txn_keepalive_ms */, &highest_seen_txn_id, - MonoDelta::FromSeconds(10))); + MonoTime::Now() + MonoDelta::FromSeconds(10))); // Make sure the highest txn ID we've seen matches the one we just started. ASSERT_EQ(1, highest_seen_txn_id); } @@ -823,7 +823,7 @@ TEST_F(TxnStatusTableITest, CheckOpenTxnStatusTable) { // // TODO(aserbin): change this to expected Status::OK() after implementing that auto s = txn_sys_client_->RegisterParticipant( - kNewTxnId, "txn_participant", kUser, MonoDelta::FromSeconds(10)); + kNewTxnId, "txn_participant", kUser, MonoTime::Now() + MonoDelta::FromSeconds(10)); ASSERT_TRUE(s.IsNotFound()) << s.ToString(); ASSERT_STR_CONTAINS(s.ToString(), "No tablet covering the requested range partition"); @@ -850,7 +850,7 @@ TEST_F(TxnStatusTableITest, CheckOpenTxnStatusTableConcurrent) { ++success_count_begin; } s = txn_sys_client_->RegisterParticipant( - 0, kParticipant, kUser, MonoDelta::FromSeconds(10)); + 0, kParticipant, kUser, MonoTime::Now() + MonoDelta::FromSeconds(10)); if (s.ok()) { ++success_count_register; } diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc index c34821d..ea401b8 100644 --- a/src/kudu/master/master.cc +++ b/src/kudu/master/master.cc @@ -346,9 +346,9 @@ Status Master::InitTxnManager() { return Status::OK(); } -Status Master::WaitForTxnManagerInit(const MonoDelta& timeout) const { - if (timeout.Initialized()) { - const Status* s = txn_manager_init_status_.WaitFor(timeout); +Status Master::WaitForTxnManagerInit(MonoTime deadline) const { + if (deadline.Initialized()) { + const Status* s = txn_manager_init_status_.WaitFor(deadline - MonoTime::Now()); if (!s) { return Status::TimedOut("timed out waiting for TxnManager to initialize"); } diff --git a/src/kudu/master/master.h b/src/kudu/master/master.h index 6ff30e8..46ed76c 100644 --- a/src/kudu/master/master.h +++ b/src/kudu/master/master.h @@ -35,6 +35,7 @@ namespace kudu { class HostPort; class MaintenanceManager; class MonoDelta; +class MonoTime; class ThreadPool; namespace rpc { @@ -76,7 +77,7 @@ class Master : public kserver::KuduServer { Status StartAsync(); Status WaitForCatalogManagerInit() const; - Status WaitForTxnManagerInit(const MonoDelta& timeout = {}) const; + Status WaitForTxnManagerInit(MonoTime deadline = {}) const; // Wait until this Master's catalog manager instance is the leader and is ready. // This method is intended for use by unit tests. diff --git a/src/kudu/master/txn_manager-test.cc b/src/kudu/master/txn_manager-test.cc index 642aef7..70818e7 100644 --- a/src/kudu/master/txn_manager-test.cc +++ b/src/kudu/master/txn_manager-test.cc @@ -157,7 +157,7 @@ TEST_F(TxnManagerTest, LazyInitialization) { // should not be initialized. { const MonoDelta kTimeout = MonoDelta::FromSeconds(1); - auto s = master_->WaitForTxnManagerInit(kTimeout); + auto s = master_->WaitForTxnManagerInit(MonoTime::Now() + kTimeout); ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); ASSERT_STR_CONTAINS(s.ToString(), "timed out waiting for TxnManager to initialize"); diff --git a/src/kudu/master/txn_manager.cc b/src/kudu/master/txn_manager.cc index 33477a0..d6463e0 100644 --- a/src/kudu/master/txn_manager.cc +++ b/src/kudu/master/txn_manager.cc @@ -18,7 +18,6 @@ #include "kudu/master/txn_manager.h" #include <cstdint> -#include <limits> #include <memory> #include <ostream> #include <vector> @@ -95,16 +94,6 @@ void CheckRespErrorOrSetUnknown(const Status& s, RespClass* resp) { } } -// Conversion of a deadline specified for an RPC into a timeout, i.e. -// convert a point in time to a delta between current time and the specified -// point in time. -MonoDelta ToDelta(const MonoTime& deadline) { - MonoDelta timeout = deadline == MonoTime::Max() - ? MonoDelta::FromNanoseconds(std::numeric_limits<int64_t>::max()) - : deadline - MonoTime::Now(); - return timeout; -} - } // anonymous namespace TxnManager::TxnManager(Master* server) @@ -147,7 +136,7 @@ Status TxnManager::BeginTransaction(const string& username, int64_t highest_seen_txn_id = -1; s = txn_sys_client_->BeginTransaction( try_txn_id, username, &keepalive_ms, - &highest_seen_txn_id, ToDelta(deadline)); + &highest_seen_txn_id, deadline); if (s.ok()) { DCHECK_GE(highest_seen_txn_id, 0); // The idea is to make the thread that has gotten a transaction reserved @@ -211,7 +200,7 @@ Status TxnManager::CommitTransaction(int64_t txn_id, const MonoTime& deadline) { RETURN_NOT_OK(CheckInitialized(deadline)); return txn_sys_client_->BeginCommitTransaction( - txn_id, username, ToDelta(deadline)); + txn_id, username, deadline); } Status TxnManager::GetTransactionState(int64_t txn_id, @@ -221,21 +210,21 @@ Status TxnManager::GetTransactionState(int64_t txn_id, DCHECK(txn_status); RETURN_NOT_OK(CheckInitialized(deadline)); return txn_sys_client_->GetTransactionStatus( - txn_id, username, txn_status, ToDelta(deadline)); + txn_id, username, txn_status, deadline); } Status TxnManager::AbortTransaction(int64_t txn_id, const string& username, const MonoTime& deadline) { RETURN_NOT_OK(CheckInitialized(deadline)); - return txn_sys_client_->AbortTransaction(txn_id, username, ToDelta(deadline)); + return txn_sys_client_->AbortTransaction(txn_id, username, deadline); } Status TxnManager::KeepTransactionAlive(int64_t txn_id, const string& username, const MonoTime& deadline) { RETURN_NOT_OK(CheckInitialized(deadline)); - return txn_sys_client_->KeepTransactionAlive(txn_id, username, ToDelta(deadline)); + return txn_sys_client_->KeepTransactionAlive(txn_id, username, deadline); } // This method isn't supposed to be called concurrently, so there isn't any @@ -291,8 +280,7 @@ Status TxnManager::CheckInitialized(const MonoTime& deadline) { // earlier the deadline, otherwise client might consider // the call timing out, but we want to deliver // ServiceUnavailable() status instead. - auto s = server_->WaitForTxnManagerInit( - deadline.Initialized() ? ToDelta(deadline) : MonoDelta()); + auto s = server_->WaitForTxnManagerInit(deadline); if (s.IsTimedOut()) { // The state of not-yet-initialized TxnManager is a transitional one, // so callers are assumed to retry and succeed eventually. diff --git a/src/kudu/tools/tool_action_txn.cc b/src/kudu/tools/tool_action_txn.cc index 346ea38..46b4a76 100644 --- a/src/kudu/tools/tool_action_txn.cc +++ b/src/kudu/tools/tool_action_txn.cc @@ -401,7 +401,8 @@ Status ShowTxn(const RunnerContext& context) { pb.set_txn_id(txn_id); pb.set_type(ParticipantOpPB::GET_METADATA); RETURN_NOT_OK(txn_client->ParticipateInTransaction( - id, pb, MonoDelta::FromMilliseconds(FLAGS_timeout_ms), nullptr, &meta_pb)); + id, pb, MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_timeout_ms), + /*begin_commit_timestamp=*/nullptr, &meta_pb)); vector<string> col_vals; for (const auto& field : participant_fields) { switch (field) { diff --git a/src/kudu/transactions/txn_status_manager.cc b/src/kudu/transactions/txn_status_manager.cc index d3889dd..52dc9bc 100644 --- a/src/kudu/transactions/txn_status_manager.cc +++ b/src/kudu/transactions/txn_status_manager.cc @@ -268,7 +268,7 @@ void CommitTasks::BeginCommitAsyncTask(int participant_idx) { txn_client_->ParticipateInTransactionAsync( participant_ids_[participant_idx], std::move(op_pb), - MonoDelta::FromMilliseconds(FLAGS_txn_background_rpc_timeout_ms), + MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_txn_background_rpc_timeout_ms), std::move(participated_cb), &begin_commit_timestamps_[participant_idx]); } @@ -312,7 +312,7 @@ void CommitTasks::FinalizeCommitAsyncTask(int participant_idx) { txn_client_->ParticipateInTransactionAsync( participant_ids_[participant_idx], std::move(op_pb), - MonoDelta::FromMilliseconds(FLAGS_txn_background_rpc_timeout_ms), + MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_txn_background_rpc_timeout_ms), std::move(participated_cb)); } @@ -354,7 +354,7 @@ void CommitTasks::AbortTxnAsyncTask(int participant_idx) { txn_client_->ParticipateInTransactionAsync( participant_ids_[participant_idx], std::move(op_pb), - MonoDelta::FromMilliseconds(FLAGS_txn_background_rpc_timeout_ms), + MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_txn_background_rpc_timeout_ms), std::move(participated_cb)); } diff --git a/src/kudu/transactions/txn_system_client.cc b/src/kudu/transactions/txn_system_client.cc index 87e16c0..f4776c1 100644 --- a/src/kudu/transactions/txn_system_client.cc +++ b/src/kudu/transactions/txn_system_client.cc @@ -194,9 +194,9 @@ Status TxnSystemClient::BeginTransaction(int64_t txn_id, const string& user, uint32_t* txn_keepalive_ms, int64_t* highest_seen_txn_id, - MonoDelta timeout) { - if (!timeout.Initialized()) { - timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms); + MonoTime deadline) { + if (!deadline.Initialized()) { + deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms); } CoordinatorOpPB coordinate_txn_op; coordinate_txn_op.set_type(CoordinatorOpPB::BEGIN_TXN); @@ -205,7 +205,7 @@ Status TxnSystemClient::BeginTransaction(int64_t txn_id, Synchronizer s; CoordinatorOpResultPB result; RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op), - timeout, + deadline, s.AsStatusCallback(), &result)); const auto ret = s.Wait(); @@ -225,9 +225,9 @@ Status TxnSystemClient::BeginTransaction(int64_t txn_id, } Status TxnSystemClient::RegisterParticipant(int64_t txn_id, const string& participant_id, - const string& user, MonoDelta timeout) { - if (!timeout.Initialized()) { - timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms); + const string& user, MonoTime deadline) { + if (!deadline.Initialized()) { + deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms); } CoordinatorOpPB coordinate_txn_op; coordinate_txn_op.set_type(CoordinatorOpPB::REGISTER_PARTICIPANT); @@ -236,16 +236,16 @@ Status TxnSystemClient::RegisterParticipant(int64_t txn_id, const string& partic coordinate_txn_op.set_user(user); Synchronizer s; RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op), - timeout, + deadline, s.AsStatusCallback())); return s.Wait(); } Status TxnSystemClient::BeginCommitTransaction(int64_t txn_id, const string& user, - MonoDelta timeout) { - if (!timeout.Initialized()) { - timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms); + MonoTime deadline) { + if (!deadline.Initialized()) { + deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms); } CoordinatorOpPB coordinate_txn_op; coordinate_txn_op.set_type(CoordinatorOpPB::BEGIN_COMMIT_TXN); @@ -253,16 +253,16 @@ Status TxnSystemClient::BeginCommitTransaction(int64_t txn_id, coordinate_txn_op.set_user(user); Synchronizer s; RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op), - timeout, + deadline, s.AsStatusCallback())); return s.Wait(); } Status TxnSystemClient::AbortTransaction(int64_t txn_id, const string& user, - MonoDelta timeout) { - if (!timeout.Initialized()) { - timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms); + MonoTime deadline) { + if (!deadline.Initialized()) { + deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms); } CoordinatorOpPB coordinate_txn_op; coordinate_txn_op.set_type(CoordinatorOpPB::ABORT_TXN); @@ -270,7 +270,7 @@ Status TxnSystemClient::AbortTransaction(int64_t txn_id, coordinate_txn_op.set_user(user); Synchronizer s; RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op), - timeout, + deadline, s.AsStatusCallback())); return s.Wait(); } @@ -278,9 +278,9 @@ Status TxnSystemClient::AbortTransaction(int64_t txn_id, Status TxnSystemClient::GetTransactionStatus(int64_t txn_id, const string& user, TxnStatusEntryPB* txn_status, - MonoDelta timeout) { - if (!timeout.Initialized()) { - timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms); + MonoTime deadline) { + if (!deadline.Initialized()) { + deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms); } DCHECK(txn_status); CoordinatorOpPB coordinate_txn_op; @@ -290,7 +290,7 @@ Status TxnSystemClient::GetTransactionStatus(int64_t txn_id, Synchronizer s; CoordinatorOpResultPB result; RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op), - timeout, + deadline, s.AsStatusCallback(), &result)); const auto rs = s.Wait(); @@ -308,9 +308,9 @@ Status TxnSystemClient::GetTransactionStatus(int64_t txn_id, Status TxnSystemClient::KeepTransactionAlive(int64_t txn_id, const string& user, - MonoDelta timeout) { - if (!timeout.Initialized()) { - timeout = MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms); + MonoTime deadline) { + if (!deadline.Initialized()) { + deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_txn_system_client_op_timeout_ms); } CoordinatorOpPB coordinate_txn_op; coordinate_txn_op.set_type(CoordinatorOpPB::KEEP_TXN_ALIVE); @@ -318,17 +318,16 @@ Status TxnSystemClient::KeepTransactionAlive(int64_t txn_id, coordinate_txn_op.set_user(user); Synchronizer s; RETURN_NOT_OK(CoordinateTransactionAsync(std::move(coordinate_txn_op), - timeout, + deadline, s.AsStatusCallback())); return s.Wait(); } Status TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB coordinate_txn_op, - MonoDelta timeout, + MonoTime deadline, const StatusCallback& cb, CoordinatorOpResultPB* result) { DCHECK(txn_status_table_); - const MonoTime deadline = MonoTime::Now() + timeout; unique_ptr<TxnStatusTabletContext> ctx( new TxnStatusTabletContext({ txn_status_table(), @@ -372,22 +371,21 @@ Status TxnSystemClient::CoordinateTransactionAsync(CoordinatorOpPB coordinate_tx Status TxnSystemClient::ParticipateInTransaction(const string& tablet_id, const ParticipantOpPB& participant_op, - const MonoDelta& timeout, + MonoTime deadline, Timestamp* begin_commit_timestamp, TxnMetadataPB* metadata_pb) { Synchronizer sync; - ParticipateInTransactionAsync(tablet_id, participant_op, timeout, + ParticipateInTransactionAsync(tablet_id, participant_op, deadline, sync.AsStatusCallback(), begin_commit_timestamp, metadata_pb); return sync.Wait(); } void TxnSystemClient::ParticipateInTransactionAsync(const string& tablet_id, ParticipantOpPB participant_op, - const MonoDelta& timeout, + MonoTime deadline, StatusCallback cb, Timestamp* begin_commit_timestamp, TxnMetadataPB* metadata_pb) { - MonoTime deadline = MonoTime::Now() + timeout; unique_ptr<TxnParticipantContext> ctx( new TxnParticipantContext({ client_.get(), diff --git a/src/kudu/transactions/txn_system_client.h b/src/kudu/transactions/txn_system_client.h index 4669e85..fa68356 100644 --- a/src/kudu/transactions/txn_system_client.h +++ b/src/kudu/transactions/txn_system_client.h @@ -89,10 +89,6 @@ class TxnSystemClient { return AddTxnStatusTableRangeWithClient(lower_bound, upper_bound, client_.get()); } - // TODO(awong): in the methods below with 'timeout' parameter, - // pass a deadline instead of a timeout so we can more easily - // associate it with potential user-specified deadlines. - // Attempts to create a transaction with the given 'txn_id'. // Returns an error if the transaction ID has already been taken, or if there // was an error writing to the transaction status table. In success case @@ -107,24 +103,24 @@ class TxnSystemClient { std::string& user, uint32_t* txn_keepalive_ms = nullptr, int64_t* highest_seen_txn_id = nullptr, - MonoDelta timeout = MonoDelta()); + MonoTime deadline = MonoTime()); // Attempts to register the given participant with the given transaction. // Returns an error if the transaction hasn't yet been started, or if the // 'user' isn't permitted to modify the transaction. Status RegisterParticipant(int64_t txn_id, const std::string& participant_id, const std::string& user, - MonoDelta timeout = MonoDelta()); + MonoTime deadline = MonoTime()); // Initiates committing a transaction with the given identifier. Status BeginCommitTransaction(int64_t txn_id, const std::string& user, - MonoDelta timeout = MonoDelta()); + MonoTime deadline = MonoTime()); // Aborts a transaction with the given identifier. Status AbortTransaction(int64_t txn_id, const std::string& user, - MonoDelta timeout = MonoDelta()); + MonoTime deadline = MonoTime()); // Retrieves transactions status. On success, returns Status::OK() and stores // the result status in the 'txn_status' output parameter. On failure, @@ -132,12 +128,12 @@ class TxnSystemClient { Status GetTransactionStatus(int64_t txn_id, const std::string& user, TxnStatusEntryPB* txn_status, - MonoDelta timeout = MonoDelta()); + MonoTime deadline = MonoTime()); // Send keep-alive heartbeat for the specified transaction as the given user. Status KeepTransactionAlive(int64_t txn_id, const std::string& user, - MonoDelta timeout = MonoDelta()); + MonoTime deadline = MonoTime()); // Opens the transaction status table, refreshing metadata with that from the // masters. @@ -155,12 +151,12 @@ class TxnSystemClient { // with the timestamp used to replicate the op on the participant. Status ParticipateInTransaction(const std::string& tablet_id, const tserver::ParticipantOpPB& participant_op, - const MonoDelta& timeout, + MonoTime deadline, Timestamp* begin_commit_timestamp = nullptr, tablet::TxnMetadataPB* metadata_pb = nullptr); void ParticipateInTransactionAsync(const std::string& tablet_id, tserver::ParticipantOpPB participant_op, - const MonoDelta& timeout, + MonoTime deadline, StatusCallback cb, Timestamp* begin_commit_timestamp = nullptr, tablet::TxnMetadataPB* metadata_pb = nullptr); @@ -179,7 +175,7 @@ class TxnSystemClient { client::KuduClient* client); Status CoordinateTransactionAsync(tserver::CoordinatorOpPB coordinate_txn_op, - MonoDelta timeout, + MonoTime deadline, const StatusCallback& cb, tserver::CoordinatorOpResultPB* result = nullptr); diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc index 752b82b..ab6c7eb 100644 --- a/src/kudu/tserver/ts_tablet_manager.cc +++ b/src/kudu/tserver/ts_tablet_manager.cc @@ -1190,7 +1190,7 @@ void TSTabletManager::RegisterAndBeginParticipantTxnTask( TabletServerErrorPB::UNKNOWN_ERROR); } auto s = txn_system_client->RegisterParticipant( - txn_id, replica->tablet_id(), user, deadline - now); + txn_id, replica->tablet_id(), user, deadline); VLOG(2) << Substitute("RegisterParticipant() $0 for txn ID $1 returned $2", replica->tablet_id(), txn_id, s.ToString()); // If the transaction falls in a range that doesn't exist, re-open the @@ -1198,7 +1198,7 @@ void TSTabletManager::RegisterAndBeginParticipantTxnTask( if (s.IsNotFound()) { s = txn_system_client->OpenTxnStatusTable().AndThen([&] { return txn_system_client->RegisterParticipant( - txn_id, replica->tablet_id(), user, deadline - MonoTime::Now()); + txn_id, replica->tablet_id(), user, deadline); }); } if (PREDICT_FALSE(!s.ok())) {
