This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new fd7a403 KUDU-2612: add txn memrowsets to tablet
fd7a403 is described below
commit fd7a40367a81555230eac62a1a8d1467433ed100
Author: Andrew Wong <[email protected]>
AuthorDate: Tue Sep 22 00:31:37 2020 -0700
KUDU-2612: add txn memrowsets to tablet
This patch introduces the ability to insert data into per-transaction
MRSs. When a transaction is started, it creates a new MRS to which
inserts of that transaction can be applied. This "uncommitted
transactional MRS" cannot be inserted to by any other transaction and
cannot be scanned by any scanner. Once the transaction is committed,
i.e. the FINALIZE_COMMIT op is applied for the transaction, the MRS is
moved to a separate set of "committed" transactional MRSs that are
available to be scanned, updated, and flushed. If the transaction is
aborted, the transactional MRS is excised from the list of transactional
MRSs entirely.
When inserting, updating, or scanning rows, ops must now consider all
committed transactional MRSs, in addition to the main, non-transactional
MRS, checking for row presence in any store that is "committed" and
visible to users.
Additionally, when a MRS flush is performed, rather than flushing the
main, non-transactional MRS alone, Kudu will now collect the
non-transactional MRS along with all committed transactional MRSs, and
roll DRSs using the same rowset-merging code that exists for
compactions. The new flushed data will honor each transactions' commit
timestamp rather than the per-row apply timestamps. Existing MRS-related
metrics have been updated to reflect that such flush ops now need to
consider transactional MRSs.
In order to support rebuilding these MRSs when restarting, a new boolean
field is added to the transaction metadata: 'flushed_committed_mrs',
which gets set when the transactional MRS is flushed to disk. If true,
the transaction's write ops do not need to be replayed, since the
transaction does not have an active MRS. Otherwise, the transaction's
write ops do need to be replayed, even if the transaction is persisted
in metadata as being committed.
Additionally, a transaction ID is added to the MutatedStorePB message
stored in the WAL, for the sake of replaying ops that land in committed,
transactional MRSs. Upon replay of such mutations, if the transactional
MRS being mutated to is not active (i.e. it has been flushed), the
mutation op can be ignored.
When starting up, we start transactional MRSs for all transactions that
do not have 'flushed_committed_mrs' set, which indicates that the
transaction's MRS was not flushed before restarting.
NOTE: the locking story here is not complete -- multiple transactions
can write to the same rows without consequence, which violates our row
uniqueness constraints. This will be addressed in future patches.
Change-Id: I1dea4031c0acb875993352d452dc4c233e35a502
Reviewed-on: http://gerrit.cloudera.org:8080/16515
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/integration-tests/CMakeLists.txt | 1 +
.../integration-tests/txn_participant-itest.cc | 99 ++-
src/kudu/tablet/memrowset.cc | 19 +-
src/kudu/tablet/memrowset.h | 4 +-
src/kudu/tablet/metadata.proto | 5 +
src/kudu/tablet/ops/write_op.cc | 59 +-
src/kudu/tablet/ops/write_op.h | 39 +-
src/kudu/tablet/row_op.cc | 8 +
src/kudu/tablet/row_op.h | 2 +
src/kudu/tablet/tablet.cc | 392 ++++++++---
src/kudu/tablet/tablet.h | 77 ++-
src/kudu/tablet/tablet.proto | 6 +
src/kudu/tablet/tablet_bootstrap.cc | 28 +-
src/kudu/tablet/tablet_metadata.cc | 39 +-
src/kudu/tablet/tablet_metadata.h | 19 +-
src/kudu/tablet/tablet_replica-test-base.cc | 3 +
src/kudu/tablet/txn_metadata.h | 19 +-
src/kudu/tablet/txn_participant-test.cc | 717 +++++++++++++++++++--
src/kudu/tablet/txn_participant.cc | 10 +
src/kudu/tablet/txn_participant.h | 13 +-
src/kudu/tserver/tablet_service.cc | 3 +-
src/kudu/tserver/tserver.proto | 3 +
src/kudu/util/locks.h | 7 +-
23 files changed, 1351 insertions(+), 221 deletions(-)
diff --git a/src/kudu/integration-tests/CMakeLists.txt
b/src/kudu/integration-tests/CMakeLists.txt
index d920ced..98d0069 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -36,6 +36,7 @@ set(INTEGRATION_TESTS_SRCS
add_library(itest_util ${INTEGRATION_TESTS_SRCS})
target_link_libraries(itest_util
+ tablet_test_util
tserver_proto
tserver_test_util
master_proto
diff --git a/src/kudu/integration-tests/txn_participant-itest.cc
b/src/kudu/integration-tests/txn_participant-itest.cc
index 1d41573..81be4c0 100644
--- a/src/kudu/integration-tests/txn_participant-itest.cc
+++ b/src/kudu/integration-tests/txn_participant-itest.cc
@@ -30,8 +30,12 @@
#include <gtest/gtest.h>
#include "kudu/clock/clock.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/common/row_operations.h"
#include "kudu/common/timestamp.h"
+#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/ref_counted.h"
@@ -42,8 +46,10 @@
#include "kudu/integration-tests/test_workload.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tablet/mvcc.h"
+#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metadata.h"
+#include "kudu/tablet/tablet_replica-test-base.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/txn_participant-test-util.h"
#include "kudu/tablet/txn_participant.h"
@@ -52,6 +58,7 @@
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/countdown_latch.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
@@ -78,8 +85,8 @@ using kudu::tablet::TabletReplica;
using kudu::tablet::Txn;
using kudu::tablet::TxnParticipant;
using kudu::tserver::ParticipantOpPB;
-using kudu::tserver::ParticipantRequestPB;
using kudu::tserver::ParticipantResponsePB;
+using kudu::tserver::WriteRequestPB;
using std::string;
using std::thread;
using std::unique_ptr;
@@ -158,8 +165,10 @@ class TxnParticipantITest : public KuduTest {
}
// Creates a single-tablet replicated table.
- void SetUpTable(string* table_name = nullptr) {
+ void SetUpTable(string* table_name = nullptr,
+ TestWorkload::WritePattern pattern =
TestWorkload::INSERT_RANDOM_ROWS) {
TestWorkload w(cluster_.get());
+ w.set_write_pattern(pattern);
w.Setup();
w.Start();
while (w.rows_inserted() < 1) {
@@ -169,6 +178,7 @@ class TxnParticipantITest : public KuduTest {
*table_name = w.table_name();
}
w.StopAndJoin();
+ initial_row_count_ = w.rows_inserted();
}
// Quiesces servers such that the tablet server at index 'ts_idx' is set up
@@ -192,6 +202,7 @@ class TxnParticipantITest : public KuduTest {
protected:
unique_ptr<InternalMiniCluster> cluster_;
unordered_map<string, TServerDetails*> ts_map_;
+ int initial_row_count_;
};
// Test that participant ops only applied to followers via replication from
@@ -302,6 +313,7 @@ TEST_P(ParticipantCopyITest, TestCopyParticipantOps) {
ASSERT_TRUE(leader_replica->tablet_metadata()->HasTxnMetadata(i));
}
TestWorkload w(cluster_.get());
+ w.set_already_present_allowed(true);
w.set_table_name(table_name);
w.Setup();
w.Start();
@@ -452,49 +464,78 @@ class TxnParticipantElectionStormITest : public
TxnParticipantITest {
opts.num_tablet_servers = 3;
cluster_.reset(new InternalMiniCluster(env_, std::move(opts)));
ASSERT_OK(cluster_->Start());
- NO_FATALS(SetUpTable());
+ // We'll continue writing from where we left off, so write in order.
+ NO_FATALS(SetUpTable(nullptr, TestWorkload::INSERT_SEQUENTIAL_ROWS));
}
};
TEST_F(TxnParticipantElectionStormITest, TestFrequentElections) {
vector<TabletReplica*> replicas;
+ const string tablet_id = cluster_->mini_tablet_server(0)->ListTablets()[0];
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
- auto* ts = cluster_->mini_tablet_server(i);
- const auto& tablets = ts->ListTablets();
scoped_refptr<TabletReplica> r;
- ASSERT_TRUE(ts->server()->tablet_manager()->LookupTablet(tablets[0], &r));
+
ASSERT_TRUE(cluster_->mini_tablet_server(i)->server()->tablet_manager()->LookupTablet(
+ tablet_id, &r));
replicas.emplace_back(r.get());
}
+
+ const auto write = [&] (int64_t txn_id, int row_id, TabletReplica* replica) {
+ WriteRequestPB req;
+ req.set_txn_id(txn_id);
+ req.set_tablet_id(tablet_id);
+ const auto& schema = GetSimpleTestSchema();
+ RETURN_NOT_OK(SchemaToPB(schema, req.mutable_schema()));
+ KuduPartialRow row(&schema);
+ RETURN_NOT_OK(row.SetInt32(0, row_id));
+ RETURN_NOT_OK(row.SetInt32(1, row_id));
+ RowOperationsPBEncoder enc(req.mutable_row_operations());
+ enc.Add(RowOperationsPB::INSERT, row);
+ return tablet::TabletReplicaTestBase::ExecuteWrite(replica, req);
+ };
+
// Inject latency so elections become more frequent and wait a bit for our
// latency injection to kick in.
FLAGS_raft_enable_pre_election = false;
FLAGS_consensus_inject_latency_ms_in_notifications = 1.5 *
FLAGS_raft_heartbeat_interval_ms;;
SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 2));
- // Send a participant op to all replicas concurrently. Leaders should attempt
- // to replicate, and followers should immediately reject the request. There
- // will be a real chance that the leader op will not be successfully
- // replicated because of an election change.
constexpr const int kNumTxns = 10;
vector<ParticipantOpPB::ParticipantOpType>
last_successful_ops_per_txn(kNumTxns);
+ vector<int> num_successful_writes_per_txn(kNumTxns);
for (int txn_id = 0; txn_id < kNumTxns; txn_id++) {
- ParticipantOpPB::ParticipantOpType last_successful_op =
ParticipantOpPB::UNKNOWN;;
- for (const auto& op : kCommitSequence) {
- vector<thread> threads;
- vector<Status> statuses(cluster_->num_tablet_servers(),
Status::Incomplete(""));
- for (int r = 0; r < cluster_->num_tablet_servers(); r++) {
- threads.emplace_back([&, r] {
- ParticipantResponsePB resp;
- Status s = CallParticipantOp(replicas[r], txn_id, op,
kDummyCommitTimestamp, &resp);
- if (resp.has_error()) {
- s = StatusFromPB(resp.error().status());
+ // Send some writes in the background and keep track of the number of
+ // successful rows.
+ CountDownLatch prt_ops_done(1);
+ thread writers_thread([&] {
+ int num_successful_writes = 0;
+ for (int cur_row = initial_row_count_; prt_ops_done.count() > 0;
cur_row++) {
+ vector<Status> statuses(cluster_->num_tablet_servers());
+ vector<thread> write_threads;
+ for (int r = 0; r < cluster_->num_tablet_servers(); r++) {
+ write_threads.emplace_back([&, r, cur_row] {
+ statuses[r] = write(txn_id, cur_row, replicas[r]);
+ });
+ }
+ for (auto& wt : write_threads) {
+ wt.join();
+ }
+ for (const auto& s : statuses) {
+ if (s.ok()) {
+ num_successful_writes++;
+ break;
}
- statuses[r] = s;
- });
- }
- for (auto& t : threads) {
- t.join();
+ }
}
+ num_successful_writes_per_txn[txn_id] = num_successful_writes;
+ });
+
+ // Send a participant op to all replicas concurrently. Leaders should
+ // attempt to replicate, and followers should immediately reject the
+ // request. There will be a real chance that the leader op will not be
+ // successfully replicated because of a leadership change.
+ ParticipantOpPB::ParticipantOpType last_successful_op =
ParticipantOpPB::UNKNOWN;;
+ for (const auto& op : kCommitSequence) {
+ vector<Status> statuses = RunOnReplicas(replicas, txn_id, op);
// If this op succeeded on any replica, keep track of it -- it indicates
// what the final state of each transaction should be.
for (const auto& s : statuses) {
@@ -504,9 +545,12 @@ TEST_F(TxnParticipantElectionStormITest,
TestFrequentElections) {
}
last_successful_ops_per_txn[txn_id] = last_successful_op;
}
+ prt_ops_done.CountDown();
+ writers_thread.join();
}
// Validate that each replica has each transaction in the appropriate state.
vector<TxnParticipant::TxnEntry> expected_txns;
+ int expected_rows = initial_row_count_;
for (int txn_id = 0; txn_id < kNumTxns; txn_id++) {
const auto& last_successful_op = last_successful_ops_per_txn[txn_id];
if (last_successful_op == ParticipantOpPB::UNKNOWN) {
@@ -521,6 +565,7 @@ TEST_F(TxnParticipantElectionStormITest,
TestFrequentElections) {
expected_state = Txn::kCommitInProgress;
break;
case ParticipantOpPB::FINALIZE_COMMIT:
+ expected_rows += num_successful_writes_per_txn[txn_id];
expected_state = Txn::kCommitted;
break;
default:
@@ -540,6 +585,9 @@ TEST_F(TxnParticipantElectionStormITest,
TestFrequentElections) {
TxnsAsString(expected_txns),
TxnsAsString(actual_txns));
});
}
+ for (int i = 0; i < replicas.size(); i++) {
+ ASSERT_EQ(expected_rows, replicas[i]->CountLiveRowsNoFail());
+ }
// Now restart the cluster and ensure the transaction state is restored.
cluster_->Shutdown();
@@ -569,6 +617,7 @@ TEST_F(TxnParticipantElectionStormITest,
TestFrequentElections) {
<< Substitute("Expected: $0,\nActual: $1",
TxnsAsString(expected_txns),
TxnsAsString(actual_txns_not_initting));
+ ASSERT_EQ(expected_rows, r->CountLiveRowsNoFail());
}
}
diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc
index 3f1796f..55a3f1e 100644
--- a/src/kudu/tablet/memrowset.cc
+++ b/src/kudu/tablet/memrowset.cc
@@ -135,7 +135,8 @@ MemRowSet::MemRowSet(int64_t id,
tree_(arena_),
debug_insert_count_(0),
debug_update_count_(0),
- anchorer_(log_anchor_registry, Substitute("MemRowSet-$0", id_)),
+ anchorer_(log_anchor_registry, Substitute("MemRowSet-$0$1", id_, txn_id_ ?
+ Substitute("(txn_id=$0)",
*txn_id) : "")),
has_been_compacted_(false),
live_row_count_(0) {
CHECK(schema.has_column_ids());
@@ -274,6 +275,9 @@ Status MemRowSet::MutateRow(Timestamp timestamp,
MemStoreTargetPB* target = result->add_mutated_stores();
target->set_mrs_id(id_);
+ if (txn_id_) {
+ target->set_rs_txn_id(*txn_id_);
+ }
}
stats->mrs_consulted++;
@@ -699,14 +703,21 @@ Status MemRowSet::Iterator::GetCurrentRow(RowBlockRow*
dst_row,
Mutation** redo_head,
Arena* mutation_arena,
Timestamp* insertion_timestamp) {
-
DCHECK(boost::none == opts_.snap_to_exclude);
DCHECK(redo_head != nullptr);
// Get the row from the MemRowSet. It may have a different schema from the
iterator projection.
MRSRow src_row = GetCurrentRow();
-
- *insertion_timestamp = src_row.insertion_timestamp();
+ const auto& mrs_txn_id = memrowset_->txn_id();
+ if (mrs_txn_id) {
+ // NOTE: we currently only support flushing committed MRSs.
+ const auto& txn_meta = memrowset_->txn_metadata();
+ CHECK(opts_.snap_to_include.IsCommitted(*txn_meta.get()));
+ CHECK(boost::none != txn_meta->commit_timestamp());
+ *insertion_timestamp = *txn_meta->commit_timestamp();
+ } else {
+ *insertion_timestamp = src_row.insertion_timestamp();
+ }
// Project the RowChangeList if required
*redo_head = src_row.acquire_redo_head();
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index 3e5ebfa..e5d2b30 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -38,6 +38,7 @@
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/concurrent_btree.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/rowset_metadata.h"
@@ -383,7 +384,8 @@ class MemRowSet : public RowSet,
virtual Status DebugDump(std::vector<std::string> *lines) override;
std::string ToString() const override {
- return "memrowset";
+ return strings::Substitute("memrowset$0",
+ txn_id_ ? strings::Substitute("(txn_id=$0)", *txn_id_) : "");
}
// Mark the memrowset as frozen. See CBTree::Freeze()
diff --git a/src/kudu/tablet/metadata.proto b/src/kudu/tablet/metadata.proto
index 5cbfa14..0412128 100644
--- a/src/kudu/tablet/metadata.proto
+++ b/src/kudu/tablet/metadata.proto
@@ -103,6 +103,11 @@ message TxnMetadataPB {
// this avoids reading dirty, uncommitted rows.
optional int64 commit_mvcc_op_timestamp = 3;
+ // Whether or not this transaction has flushed its MRS after committing. If
+ // set to true, Kudu should not create an MRS for this transaction when
+ // bootstrapping.
+ optional bool flushed_committed_mrs = 4;
+
// TODO(awong): add an owner field to this for uncommitted transactions.
}
diff --git a/src/kudu/tablet/ops/write_op.cc b/src/kudu/tablet/ops/write_op.cc
index 4cc7fdf..e62e3cf 100644
--- a/src/kudu/tablet/ops/write_op.cc
+++ b/src/kudu/tablet/ops/write_op.cc
@@ -53,6 +53,7 @@
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tablet/txn_participant.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/flag_tags.h"
@@ -170,12 +171,35 @@ Status WriteOp::Prepare() {
Tablet* tablet = state()->tablet_replica()->tablet();
- Status s = tablet->DecodeWriteOperations(&client_schema, state());
+ // Before taking any other locks, acquire the transaction state lock and
+ // ensure it is open.
+ Status s;
+ if (state_->request()->has_txn_id()) {
+ s = tablet->AcquireTxnLock(state_->request()->txn_id(), state());
+ if (!s.ok()) {
+ state()->completion_callback()->set_error(s,
TabletServerErrorPB::TXN_ILLEGAL_STATE);
+ return s;
+ }
+ }
+
+ s = tablet->DecodeWriteOperations(&client_schema, state());
if (!s.ok()) {
// TODO(unknown): is MISMATCHED_SCHEMA always right here? probably not.
state()->completion_callback()->set_error(s,
TabletServerErrorPB::MISMATCHED_SCHEMA);
return s;
}
+ // Only after decoding rows, check that only supported operations make it
+ // through.
+ if (state_->request()->has_txn_id()) {
+ for (const auto& op : state_->row_ops()) {
+ const auto& op_type = op->decoded_op.type;
+ if (op_type != RowOperationsPB::INSERT &&
+ op_type != RowOperationsPB::INSERT_IGNORE) {
+ state()->completion_callback()->set_error(s,
TabletServerErrorPB::INVALID_MUTATION);
+ return Status::NotSupported("transactions may only insert");
+ }
+ }
+ }
// Authorize the request if needed.
const auto& authz_context = state()->authz_context();
@@ -340,6 +364,11 @@ void WriteOpState::set_tablet_components(
tablet_components_ = components;
}
+void WriteOpState::set_txn_rowsets(const scoped_refptr<TxnRowSets>& rowsets) {
+ DCHECK(!txn_rowsets_) << "Already set";
+ txn_rowsets_ = rowsets;
+}
+
void WriteOpState::AcquireSchemaLock(rw_semaphore* schema_lock) {
TRACE("Acquiring schema lock in shared mode");
shared_lock<rw_semaphore> temp(*schema_lock);
@@ -347,6 +376,19 @@ void WriteOpState::AcquireSchemaLock(rw_semaphore*
schema_lock) {
TRACE("Acquired schema lock");
}
+Status WriteOpState::AcquireTxnLockCheckOpen(scoped_refptr<Txn> txn) {
+ shared_lock<rw_semaphore> temp;
+ txn->AcquireReadLock(&temp);
+ const auto txn_state = txn->state();
+ if (PREDICT_FALSE(txn_state != Txn::kOpen)) {
+ return Status::InvalidArgument(Substitute("txn $0 is not open: $1",
+ txn->txn_id(), Txn::StateToString(txn_state)));
+ }
+ txn_lock_.swap(temp);
+ txn_ = std::move(txn);
+ return Status::OK();
+}
+
void WriteOpState::ReleaseSchemaLock() {
shared_lock<rw_semaphore> temp;
schema_lock_.swap(temp);
@@ -485,7 +527,20 @@ void WriteOpState::AcquireRowLocks(LockManager*
lock_manager) {
rows_lock_ = ScopedRowLock(lock_manager, this, keys,
LockManager::LOCK_EXCLUSIVE);
}
-void WriteOpState::ReleaseRowLocks() { rows_lock_.Release(); }
+void WriteOpState::ReleaseRowLocks() {
+ rows_lock_.Release();
+}
+
+void WriteOpState::ReleaseTxnLock() {
+ shared_lock<rw_semaphore> temp;
+ txn_lock_.swap(temp);
+ // It's possible we took a reference to a transaction that failed to start
+ // (e.g. because we changed leadership before the BEGIN_TXN could complete).
+ // If that's the case, clear its state in the participant.
+ txn_.reset();
+
tablet_replica()->tablet()->txn_participant()->ClearIfInitFailed(txn_->txn_id());
+ TRACE("Released schema lock");
+}
WriteOpState::~WriteOpState() {
Reset();
diff --git a/src/kudu/tablet/ops/write_op.h b/src/kudu/tablet/ops/write_op.h
index 771ade7..3616b18 100644
--- a/src/kudu/tablet/ops/write_op.h
+++ b/src/kudu/tablet/ops/write_op.h
@@ -18,6 +18,7 @@
#pragma once
#include <cstddef>
+#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
@@ -31,8 +32,8 @@
#include "kudu/consensus/consensus.pb.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
-#include "kudu/tablet/ops/op.h"
#include "kudu/tablet/lock_manager.h"
+#include "kudu/tablet/ops/op.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/bitset.h"
@@ -53,7 +54,9 @@ namespace tablet {
class ScopedOp;
class TabletReplica;
+class Txn;
class TxResultPB;
+struct TxnRowSets;
struct RowOp;
struct TabletComponents;
@@ -139,6 +142,10 @@ class WriteOpState : public OpState {
return response_;
}
+ boost::optional<int64_t> txn_id() const {
+ return request_->has_txn_id() ? boost::make_optional(request_->txn_id()) :
boost::none;
+ }
+
// Returns the state associated with authorizing this op, or 'none' if no
// authorization is necessary.
const boost::optional<WriteAuthorizationContext>& authz_context() const {
@@ -156,6 +163,9 @@ class WriteOpState : public OpState {
// in-memory edits.
void set_tablet_components(const scoped_refptr<const TabletComponents>&
components);
+ // Set the txn rowsets that this op will write into.
+ void set_txn_rowsets(const scoped_refptr<TxnRowSets>& rowsets);
+
// Take a shared lock on the given schema lock.
// This is required prior to decoding rows so that the schema does
// not change in between performing the projection and applying
@@ -165,6 +175,11 @@ class WriteOpState : public OpState {
// Acquire row locks for all of the rows in this Write.
void AcquireRowLocks(LockManager* lock_manager);
+ // Acquires the lock on the given transaction, setting 'txn_' and
+ // 'txn_lock_', which must be freed upon finishing this op. Checks if the
+ // transaction is available to be written to, returning an error if not.
+ Status AcquireTxnLockCheckOpen(scoped_refptr<Txn> txn);
+
// Release the already-acquired schema lock.
void ReleaseSchemaLock();
@@ -184,6 +199,10 @@ class WriteOpState : public OpState {
return tablet_components_.get();
}
+ const TxnRowSets* txn_rowsets() const {
+ return txn_rowsets_.get();
+ }
+
// Notifies the MVCC manager that this operation is about to start applying
// its in-memory edits. After this method is called, the op _must_
FinishApplying()
// within a bounded amount of time (there may be other threads blocked on
@@ -229,6 +248,9 @@ class WriteOpState : public OpState {
// Releases all the row locks acquired by this op.
void ReleaseRowLocks();
+ // Releases the transaction state lock.
+ void ReleaseTxnLock();
+
// Reset the RPC request, response, and row_ops_ (which refers to data
// from the request).
void ResetRpcFields();
@@ -261,9 +283,15 @@ class WriteOpState : public OpState {
// The MVCC op, set up during PREPARE phase
std::unique_ptr<ScopedOp> mvcc_op_;
- // The tablet components, acquired at the same time as mvcc_op_ is set.
+ // The tablet components, acquired at the same time as mvcc_op_ is set. This
+ // is maintained in case any of the component's rowsets get compacted away,
+ // ensuring we retain a reference to them for the duration of this op.
scoped_refptr<const TabletComponents> tablet_components_;
+ // The uncommitted transaction rowsets to write to if this write op is a part
+ // of a transaction, acquired at the same time as mvcc_op_ is set.
+ scoped_refptr<TxnRowSets> txn_rowsets_;
+
// A lock held on the tablet's schema. Prevents concurrent schema change
// from racing with a write.
shared_lock<rw_semaphore> schema_lock_;
@@ -276,6 +304,13 @@ class WriteOpState : public OpState {
// Lock that protects access to various fields of WriteOpState.
mutable simple_spinlock op_state_lock_;
+ // Transaction to which this write op writes to, if any.
+ scoped_refptr<Txn> txn_;
+
+ // Lock protecting the transaction's state, ensuring it remains open for the
+ // duration of this write.
+ shared_lock<rw_semaphore> txn_lock_;
+
DISALLOW_COPY_AND_ASSIGN(WriteOpState);
};
diff --git a/src/kudu/tablet/row_op.cc b/src/kudu/tablet/row_op.cc
index e7b955e..6a2e661 100644
--- a/src/kudu/tablet/row_op.cc
+++ b/src/kudu/tablet/row_op.cc
@@ -55,6 +55,14 @@ void RowOp::SetInsertSucceeded(int mrs_id) {
result->add_mutated_stores()->set_mrs_id(mrs_id);
}
+void RowOp::SetInsertSucceeded(int64_t txn_id, int mrs_id) {
+ DCHECK(!result) << SecureDebugString(*result);
+ result =
google::protobuf::Arena::CreateMessage<OperationResultPB>(pb_arena_);
+ auto* mutated_store = result->add_mutated_stores();
+ mutated_store->set_mrs_id(mrs_id);
+ mutated_store->set_rs_txn_id(txn_id);
+}
+
void RowOp::SetErrorIgnored() {
DCHECK(!result) << SecureDebugString(*result);
result =
google::protobuf::Arena::CreateMessage<OperationResultPB>(pb_arena_);
diff --git a/src/kudu/tablet/row_op.h b/src/kudu/tablet/row_op.h
index 19a9092..5592eae 100644
--- a/src/kudu/tablet/row_op.h
+++ b/src/kudu/tablet/row_op.h
@@ -16,6 +16,7 @@
// under the License.
#pragma once
+#include <cstdint>
#include <string>
#include "kudu/common/row_operations.h"
@@ -46,6 +47,7 @@ struct RowOp {
// Only one of the following four functions must be called, at most once.
void SetFailed(const Status& s);
void SetInsertSucceeded(int mrs_id);
+ void SetInsertSucceeded(int64_t txn_id, int mrs_id);
void SetErrorIgnored();
// REQUIRES: result must be allocated from the same protobuf::Arena
associated
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 4badfb0..f3193bc 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -23,6 +23,7 @@
#include <memory>
#include <mutex>
#include <ostream>
+#include <type_traits>
#include <unordered_map>
#include <unordered_set>
#include <utility>
@@ -54,6 +55,7 @@
#include "kudu/fs/fs_manager.h"
#include "kudu/fs/io_context.h"
#include "kudu/gutil/casts.h"
+#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/human_readable.h"
#include "kudu/gutil/strings/substitute.h"
@@ -74,6 +76,7 @@
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tablet/tablet_mm_ops.h"
+#include "kudu/tablet/txn_metadata.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/util/bitmap.h"
@@ -240,8 +243,11 @@ static CompactionPolicy *CreateCompactionPolicy() {
////////////////////////////////////////////////////////////
TabletComponents::TabletComponents(shared_ptr<MemRowSet> mrs,
+ std::vector<std::shared_ptr<MemRowSet>>
txn_mrss,
shared_ptr<RowSetTree> rs_tree)
- : memrowset(std::move(mrs)), rowsets(std::move(rs_tree)) {}
+ : memrowset(std::move(mrs)),
+ txn_memrowsets(std::move(txn_mrss)),
+ rowsets(std::move(rs_tree)) {}
////////////////////////////////////////////////////////////
// Tablet
@@ -318,7 +324,8 @@ Tablet::~Tablet() {
CHECK_EQ(expected_state, _local_state); \
} while (0)
-Status Tablet::Open(const unordered_set<int64_t>& in_flight_txn_ids) {
+Status Tablet::Open(const unordered_set<int64_t>& in_flight_txn_ids,
+ const unordered_set<int64_t>& txn_ids_with_mrs) {
TRACE_EVENT0("tablet", "Tablet::Open");
RETURN_IF_STOPPED_OR_CHECK_STATE(kInitialized);
@@ -364,9 +371,27 @@ Status Tablet::Open(const unordered_set<int64_t>&
in_flight_txn_ids) {
log_anchor_registry_.get(),
mem_trackers_.tablet_tracker,
&new_mrs));
+
+ // Create MRSs for any in-flight transactions there might be.
+ // NOTE: we may also have to create MRSs for committed transactions; that
+ // will happen upon bootstrapping.
+ std::unordered_map<int64_t, scoped_refptr<TxnRowSets>>
uncommitted_rs_by_txn_id;
+ const auto txn_meta_by_id = metadata_->GetTxnMetadata();
+ for (const auto& txn_id : txn_ids_with_mrs) {
+ shared_ptr<MemRowSet> txn_mrs;
+ // NOTE: we are able to FindOrDie() on these IDs because
+ // 'txn_ids_with_mrs' is a subset of the transaction IDs known by the
+ // metadata.
+ RETURN_NOT_OK(MemRowSet::Create(0, *schema(), txn_id,
FindOrDie(txn_meta_by_id, txn_id),
+ log_anchor_registry_.get(),
+ mem_trackers_.tablet_tracker,
+ &txn_mrs));
+ EmplaceOrDie(&uncommitted_rs_by_txn_id, txn_id, new
TxnRowSets(std::move(txn_mrs)));
+ }
std::lock_guard<rw_spinlock> lock(component_lock_);
components_.reset(new TabletComponents(
- std::move(new_mrs), std::move(new_rowset_tree)));
+ std::move(new_mrs), {}, std::move(new_rowset_tree)));
+ uncommitted_rowsets_by_txn_id_ = std::move(uncommitted_rs_by_txn_id);
}
// Compute the initial average rowset height.
@@ -551,6 +576,22 @@ Status Tablet::AcquireRowLocks(WriteOpState* op_state) {
return Status::OK();
}
+Status Tablet::AcquireTxnLock(int64_t txn_id, WriteOpState* op_state) {
+ auto txn = txn_participant_.GetTransaction(txn_id);
+ if (!txn) {
+ // While we might not have an in-flight transaction, we still might have a
+ // finished transaction that has metadata.
+ if (metadata_->HasTxnMetadata(txn_id)) {
+ // TODO(awong): IllegalState or Aborted seem more intuitive, but clients
+ // currently retry on both of those errors, assuming the error comes from
+ // Raft followers complaining about the lack of leadership.
+ return Status::InvalidArgument(Substitute("txn $0 is not open", txn_id));
+ }
+ return Status::NotFound(Substitute("txn $0 not found on tablet $1",
txn_id, tablet_id()));
+ }
+ return op_state->AcquireTxnLockCheckOpen(std::move(txn));
+}
+
Status Tablet::CheckRowInTablet(const ConstContiguousRow& row) const {
bool contains_row;
RETURN_NOT_OK(metadata_->partition_schema().PartitionContainsRow(metadata_->partition(),
@@ -701,11 +742,49 @@ Status Tablet::InsertOrUpsertUnlocked(const IOContext*
io_context,
Timestamp ts = op_state->timestamp();
ConstContiguousRow row(schema(), op->decoded_op.row_data);
- // TODO: the Insert() call below will re-encode the key, which is a
+ // TODO(todd): the Insert() call below will re-encode the key, which is a
// waste. Should pass through the KeyProbe structure perhaps.
+ const auto& txn_id = op_state->txn_id();
+ if (txn_id) {
+ // Only inserts are supported -- this is guaranteed in the prepare phase.
+ DCHECK(op_type == RowOperationsPB::INSERT ||
+ op_type == RowOperationsPB::INSERT_IGNORE);
+ // The previous presence checks only checked disk rowsets and committed txn
+ // memrowsets. Before inserting into this transaction's MRS, ensure the
+ // row doesn't already exist in the main MRS.
+ bool present_in_main_mrs = false;
+ RETURN_NOT_OK(comps->memrowset->CheckRowPresent(*op->key_probe, io_context,
+ &present_in_main_mrs,
stats));
+ if (present_in_main_mrs) {
+ if (op_type == RowOperationsPB::INSERT_IGNORE) {
+ op->SetErrorIgnored();
+ return Status::OK();
+ }
+ Status s = Status::AlreadyPresent("key already present");
+ if (metrics_) {
+ metrics_->insertions_failed_dup_key->Increment();
+ }
+ op->SetFailed(s);
+ return s;
+ }
+ const auto* txn_rowsets = DCHECK_NOTNULL(op_state->txn_rowsets());
+ Status s = txn_rowsets->memrowset->Insert(ts, row, op_state->op_id());
+ // TODO(awong): once we support transactional updates, update this to check
+ // for AlreadyPresent statuses.
+ if (PREDICT_TRUE(s.ok())) {
+ op->SetInsertSucceeded(*txn_id, txn_rowsets->memrowset->mrs_id());
+ } else if (s.IsAlreadyPresent() && op_type ==
RowOperationsPB::INSERT_IGNORE) {
+ op->SetErrorIgnored();
+ return Status::OK();
+ } else {
+ op->SetFailed(s);
+ }
+ return s;
+ }
+
// Now try to op into memrowset. The memrowset itself will return
- // AlreadyPresent if it has already been oped there.
+ // AlreadyPresent if it has already been inserted there.
Status s = comps->memrowset->Insert(ts, row, op_state->op_id());
if (s.ok()) {
op->SetInsertSucceeded(comps->memrowset->mrs_id());
@@ -787,6 +866,11 @@ Status Tablet::ApplyUpsertAsUpdate(const IOContext*
io_context,
vector<RowSet*> Tablet::FindRowSetsToCheck(const RowOp* op,
const TabletComponents* comps) {
vector<RowSet*> to_check;
+ for (const auto& txn_mrs : comps->txn_memrowsets) {
+ to_check.emplace_back(txn_mrs.get());
+ uint64_t rows;
+ txn_mrs->CountLiveRows(&rows);
+ }
if (PREDICT_TRUE(!op->orig_result_from_log)) {
// TODO(yingchun): could iterate the rowsets in a smart order
// based on recent statistics - eg if a rowset is getting
@@ -876,6 +960,14 @@ Status Tablet::MutateRowUnlocked(const IOContext*
io_context,
void Tablet::StartApplying(WriteOpState* op_state) {
shared_lock<rw_spinlock> l(component_lock_);
+
+ const auto txn_id = op_state->txn_id();
+ if (txn_id) {
+ auto txn_rowsets = FindPtrOrNull(uncommitted_rowsets_by_txn_id_, *txn_id);
+ // The provisional rowset for this transaction should have been created
+ // when applying the BEGIN_TXN op.
+ op_state->set_txn_rowsets(txn_rowsets);
+ }
op_state->StartApplying();
op_state->set_tablet_components(components_);
}
@@ -891,6 +983,26 @@ void Tablet::StartApplying(ParticipantOpState* op_state) {
}
}
+void Tablet::CreateTxnRowSets(int64_t txn_id, scoped_refptr<TxnMetadata>
txn_meta) {
+ shared_ptr<MemRowSet> new_mrs;
+ CHECK_OK(MemRowSet::Create(0, *schema(), txn_id, std::move(txn_meta),
+ log_anchor_registry_.get(),
+ mem_trackers_.tablet_tracker,
+ &new_mrs));
+ scoped_refptr<TxnRowSets> rowsets(new TxnRowSets(std::move(new_mrs)));
+ {
+ shared_lock<rw_spinlock> l(component_lock_);
+ // TODO(awong): can we ever get here?
+ if (ContainsKey(uncommitted_rowsets_by_txn_id_, txn_id)) {
+ return;
+ }
+ // We are guaranteed to succeed here because this is only ever called by
+ // the BEGIN_TXN op, which is only applied once per transaction
+ // participant.
+ EmplaceOrDie(&uncommitted_rowsets_by_txn_id_, txn_id, std::move(rowsets));
+ }
+}
+
Status Tablet::BulkCheckPresence(const IOContext* io_context, WriteOpState*
op_state) {
int num_ops = op_state->row_ops().size();
@@ -946,10 +1058,33 @@ Status Tablet::BulkCheckPresence(const IOContext*
io_context, WriteOpState* op_s
keys[i] = keys_and_indexes[i].first;
}
- // Actually perform the presence checks. We use the "bulk query"
functionality
- // provided by RowSetTree::ForEachRowSetContainingKeys(), which yields
results
- // via a callback, with grouping guarantees that callbacks for the same
RowSet
- // will be grouped together with increasing query keys.
+ // Check the presence in the committed transaction memrowsets.
+ // NOTE: we don't have to check whether the row is in the main memrowset; if
+ // it does exist there, we'll discover that when we try to insert into it.
+ const TabletComponents* comps =
DCHECK_NOTNULL(op_state->tablet_components());
+ for (auto& key_and_index : keys_and_indexes) {
+ int idx = key_and_index.second;
+ RowOp* op = row_ops_base[idx];
+ if (op->present_in_rowset) {
+ continue;
+ }
+ bool present = false;
+ for (const auto& mrs : comps->txn_memrowsets) {
+ RETURN_NOT_OK_PREPEND(mrs->CheckRowPresent(*op->key_probe, io_context,
&present,
+ op_state->mutable_op_stats(idx)),
+ Substitute("Tablet $0 failed to check row presence
for op $1",
+ tablet_id(),
op->ToString(key_schema_)));
+ if (present) {
+ op->present_in_rowset = mrs.get();
+ break;
+ }
+ }
+ }
+
+ // Perform the presence checks on the other rowsets. We use the "bulk query"
+ // functionality provided by RowSetTree::ForEachRowSetContainingKeys(), which
+ // yields results via a callback, with grouping guarantees that callbacks for
+ // the same RowSet will be grouped together with increasing query keys.
//
// We want to process each such "group" (set of subsequent calls for the same
// RowSet) one at a time. So, the callback itself aggregates results into
@@ -994,8 +1129,6 @@ Status Tablet::BulkCheckPresence(const IOContext*
io_context, WriteOpState* op_s
}
pending_group.clear();
};
-
- const TabletComponents* comps =
DCHECK_NOTNULL(op_state->tablet_components());
comps->rowsets->ForEachRowSetContainingKeys(
keys,
[&](RowSet* rs, int i) {
@@ -1038,6 +1171,8 @@ void Tablet::BeginTransaction(Txn* txn, const OpId&
op_id) {
Substitute("BEGIN_TXN-$0-$1", txn->txn_id(), txn)));
anchor->AnchorIfMinimum(op_id.index());
metadata_->AddTxnMetadata(txn->txn_id(), std::move(anchor));
+ const auto& txn_id = txn->txn_id();
+ CreateTxnRowSets(txn_id, FindOrDie(metadata_->GetTxnMetadata(), txn_id));
txn->BeginTransaction();
}
@@ -1053,15 +1188,35 @@ void Tablet::CommitTransaction(Txn* txn, Timestamp
commit_ts, const OpId& op_id)
unique_ptr<MinLogIndexAnchorer> anchor(new
MinLogIndexAnchorer(log_anchor_registry_.get(),
Substitute("FINALIZE_COMMIT-$0-$1", txn->txn_id(), txn)));
anchor->AnchorIfMinimum(op_id.index());
- metadata_->AddCommitTimestamp(txn->txn_id(), commit_ts, std::move(anchor));
+
+ const auto& txn_id = txn->txn_id();
+ metadata_->AddCommitTimestamp(txn_id, commit_ts, std::move(anchor));
+ CommitTxnRowSets(txn_id);
txn->FinalizeCommit(commit_ts.value());
}
+void Tablet::CommitTxnRowSets(int64_t txn_id) {
+ std::lock_guard<rw_spinlock> lock(component_lock_);
+ auto txn_rowsets = EraseKeyReturnValuePtr(&uncommitted_rowsets_by_txn_id_,
txn_id);
+ CHECK(txn_rowsets);
+ auto committed_mrss = components_->txn_memrowsets;
+ committed_mrss.emplace_back(txn_rowsets->memrowset);
+ components_ = new TabletComponents(components_->memrowset,
+ std::move(committed_mrss),
+ components_->rowsets);
+}
+
void Tablet::AbortTransaction(Txn* txn, const OpId& op_id) {
unique_ptr<MinLogIndexAnchorer> anchor(new
MinLogIndexAnchorer(log_anchor_registry_.get(),
Substitute("ABORT_TXN-$0-$1", txn->txn_id(), txn)));
anchor->AnchorIfMinimum(op_id.index());
- metadata_->AbortTransaction(txn->txn_id(), std::move(anchor));
+ const auto& txn_id = txn->txn_id();
+ metadata_->AbortTransaction(txn_id, std::move(anchor));
+ {
+ std::lock_guard<rw_spinlock> lock(component_lock_);
+ auto txn_rowsets = EraseKeyReturnValuePtr(&uncommitted_rowsets_by_txn_id_,
txn_id);
+ CHECK(txn_rowsets);
+ }
txn->AbortTransaction();
}
@@ -1114,7 +1269,8 @@ Status Tablet::ApplyRowOperation(const IOContext*
io_context,
// If we were unable to check rowset presence in batch (e.g. because we are
processing
// a batch which contains some duplicate keys) we need to do so now.
if (PREDICT_FALSE(!row_op->checked_present)) {
- vector<RowSet *> to_check = FindRowSetsToCheck(row_op,
op_state->tablet_components());
+ vector<RowSet *> to_check = FindRowSetsToCheck(row_op,
+
op_state->tablet_components());
for (RowSet *rowset : to_check) {
bool present = false;
RETURN_NOT_OK_PREPEND(rowset->CheckRowPresent(*row_op->key_probe,
io_context,
@@ -1204,7 +1360,9 @@ void Tablet::AtomicSwapRowSetsUnlocked(const RowSetVector
&to_remove,
auto new_tree(make_shared<RowSetTree>());
ModifyRowSetTree(*components_->rowsets, to_remove, to_add, new_tree.get());
- components_ = new TabletComponents(components_->memrowset,
std::move(new_tree));
+ components_ = new TabletComponents(components_->memrowset,
+ components_->txn_memrowsets,
+ std::move(new_tree));
}
Status Tablet::DoMajorDeltaCompaction(const vector<ColumnId>& col_ids,
@@ -1261,11 +1419,12 @@ Status Tablet::FlushUnlocked() {
TRACE_EVENT0("tablet", "Tablet::FlushUnlocked");
RETURN_NOT_OK(CheckHasNotBeenStopped());
RowSetsInCompaction input;
- shared_ptr<MemRowSet> old_mrs;
+ vector<shared_ptr<MemRowSet>> old_mrss;
{
// Create a new MRS with the latest schema.
std::lock_guard<rw_spinlock> lock(component_lock_);
- RETURN_NOT_OK(ReplaceMemRowSetUnlocked(&input, &old_mrs));
+ RETURN_NOT_OK(ReplaceMemRowSetsUnlocked(&input, &old_mrss));
+ DCHECK_GE(old_mrss.size(), 1);
}
// Wait for any in-flight ops to finish against the old MRS before we flush
@@ -1274,50 +1433,18 @@ Status Tablet::FlushUnlocked() {
// This may fail if the tablet has been stopped.
RETURN_NOT_OK(mvcc_.WaitForApplyingOpsToApply());
- // Note: "input" should only contain old_mrs.
- return FlushInternal(input, old_mrs);
-}
-
-Status Tablet::ReplaceMemRowSetUnlocked(RowSetsInCompaction *compaction,
- shared_ptr<MemRowSet> *old_ms) {
- *old_ms = components_->memrowset;
- // Mark the memrowset rowset as locked, so compactions won't consider it
- // for inclusion in any concurrent compactions.
- std::unique_lock<std::mutex> ms_lock(*(*old_ms)->compact_flush_lock(),
std::try_to_lock);
- CHECK(ms_lock.owns_lock());
-
- // Add to compaction.
- compaction->AddRowSet(*old_ms, std::move(ms_lock));
-
- shared_ptr<MemRowSet> new_mrs;
- RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema(),
- log_anchor_registry_.get(),
- mem_trackers_.tablet_tracker,
- &new_mrs));
- auto new_rst(make_shared<RowSetTree>());
- ModifyRowSetTree(*components_->rowsets,
- RowSetVector(), // remove nothing
- { *old_ms }, // add the old MRS
- new_rst.get());
- // Swap it in
- components_.reset(new TabletComponents(std::move(new_mrs),
std::move(new_rst)));
- return Status::OK();
-}
-
-Status Tablet::FlushInternal(const RowSetsInCompaction& input,
- const shared_ptr<MemRowSet>& old_ms) {
{
State s;
RETURN_NOT_OK(CheckHasNotBeenStopped(&s));
CHECK(s == kOpen || s == kBootstrapping);
}
- // Step 1. Freeze the old memrowset by blocking readers and swapping
- // it in as a new rowset, replacing it with an empty one.
+ // Freeze the old memrowset by blocking readers and swapping it in as a new
+ // rowset, replacing it with an empty one.
//
- // At this point, we have already swapped in a new empty rowset, and
- // any new inserts are going into that one. 'old_ms' is effectively
- // frozen -- no new inserts should arrive after this point.
+ // At this point, we have already swapped in a new empty rowset, and any new
+ // inserts are going into that one. 'old_ms' is effectively frozen -- no new
+ // inserts should arrive after this point.
//
// NOTE: updates and deletes may still arrive into 'old_ms' at this point.
//
@@ -1325,16 +1452,28 @@ Status Tablet::FlushInternal(const RowSetsInCompaction&
input,
// use to improve iteration performance during the flush. The old design
// used this, but not certain whether it's still doable with the new design.
- uint64_t start_insert_count = old_ms->debug_insert_count();
- int64_t mrs_being_flushed = old_ms->mrs_id();
+ uint64_t start_insert_count = 0;
+ // Keep track of the main MRS.
+ int64_t main_mrs_id = -1;
+ vector<TxnInfoBeingFlushed> txns_being_flushed;
+ for (const auto& old_mrs : old_mrss) {
+ start_insert_count += old_mrs->debug_insert_count();
+ if (old_mrs->txn_id()) {
+ txns_being_flushed.emplace_back(*old_mrs->txn_id());
+ } else {
+ DCHECK_EQ(-1, main_mrs_id);
+ main_mrs_id = old_mrs->mrs_id();
+ }
+ }
+ DCHECK_NE(-1, main_mrs_id);
- if (old_ms->empty()) {
+ if (old_mrss.size() == 1 && old_mrss[0]->empty()) {
// If we're flushing an empty RowSet, we can short circuit here rather than
// waiting until the check at the end of DoCompactionAndFlush(). This
avoids
// the need to create cfiles and write their headers only to later delete
// them.
LOG_WITH_PREFIX(INFO) << "MemRowSet was empty: no flush needed.";
- return HandleEmptyCompactionOrFlush(input.rowsets(), mrs_being_flushed);
+ return HandleEmptyCompactionOrFlush(input.rowsets(), main_mrs_id,
txns_being_flushed);
}
if (flush_hooks_) {
@@ -1342,21 +1481,61 @@ Status Tablet::FlushInternal(const RowSetsInCompaction&
input,
"PostSwapNewMemRowSet hook failed");
}
- VLOG_WITH_PREFIX(1) << Substitute("Flush: entering stage 1 (old memrowset"
- "already frozen for inserts). Memstore"
- "in-memory size: $0 bytes",
- old_ms->memory_footprint());
+ if (VLOG_IS_ON(1)) {
+ size_t memory_footprint = 0;
+ for (const auto& old_mrs : old_mrss) {
+ memory_footprint += old_mrs->memory_footprint();
+ }
+ VLOG_WITH_PREFIX(1) << Substitute("Flush: entering stage 1 (old memrowset"
+ "already frozen for inserts). Memstore"
+ "in-memory size: $0 bytes",
+ memory_footprint);
+ }
- RETURN_NOT_OK(DoMergeCompactionOrFlush(input, mrs_being_flushed));
+ RETURN_NOT_OK(DoMergeCompactionOrFlush(input, main_mrs_id,
txns_being_flushed));
+ uint64_t end_insert_count = 0;
+ for (const auto& old_mrs : old_mrss) {
+ end_insert_count += old_mrs->debug_insert_count();
+ }
// Sanity check that no insertions happened during our flush.
- CHECK_EQ(start_insert_count, old_ms->debug_insert_count())
- << "Sanity check failed: insertions continued in memrowset "
- << "after flush was triggered! Aborting to prevent data loss.";
+ CHECK_EQ(start_insert_count, end_insert_count)
+ << "Sanity check failed: insertions continued in memrowset "
+ << "after flush was triggered! Aborting to prevent data loss.";
return Status::OK();
}
+Status Tablet::ReplaceMemRowSetsUnlocked(RowSetsInCompaction* compaction,
+ vector<shared_ptr<MemRowSet>>*
old_mrss) {
+ DCHECK(old_mrss->empty());
+ old_mrss->emplace_back(components_->memrowset);
+ for (const auto& committed_mrs : components_->txn_memrowsets) {
+ old_mrss->emplace_back(committed_mrs);
+ }
+ // Mark the memrowsets as locked, so compactions won't consider it
+ // for inclusion in any concurrent compactions.
+ for (auto& mrs : *old_mrss) {
+ std::unique_lock<std::mutex> ms_lock(*mrs->compact_flush_lock(),
std::try_to_lock);
+ CHECK(ms_lock.owns_lock());
+ compaction->AddRowSet(mrs, std::move(ms_lock));
+ }
+
+ shared_ptr<MemRowSet> new_mrs;
+ RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema(),
+ log_anchor_registry_.get(),
+ mem_trackers_.tablet_tracker,
+ &new_mrs));
+ auto new_rst(make_shared<RowSetTree>());
+ ModifyRowSetTree(*components_->rowsets,
+ RowSetVector(), // remove nothing
+ RowSetVector(old_mrss->begin(), old_mrss->end()), // add
the old MRSs
+ new_rst.get());
+ // Swap it in
+ components_.reset(new TabletComponents(std::move(new_mrs), {},
std::move(new_rst)));
+ return Status::OK();
+}
+
Status Tablet::CreatePreparedAlterSchema(AlterSchemaOpState* op_state,
const Schema* schema) {
@@ -1440,7 +1619,7 @@ Status Tablet::RewindSchemaForBootstrap(const Schema&
new_schema,
log_anchor_registry_.get(),
mem_trackers_.tablet_tracker,
&new_mrs));
- components_ = new TabletComponents(new_mrs, old_rowsets);
+ components_ = new TabletComponents(new_mrs, components_->txn_memrowsets,
old_rowsets);
}
return Status::OK();
}
@@ -1646,7 +1825,8 @@ void Tablet::CancelMaintenanceOps() {
Status Tablet::FlushMetadata(const RowSetVector& to_remove,
const RowSetMetadataVector& to_add,
- int64_t mrs_being_flushed) {
+ int64_t mrs_being_flushed,
+ const vector<TxnInfoBeingFlushed>&
txns_being_flushed) {
RowSetMetadataIds to_remove_meta;
for (const shared_ptr<RowSet>& rowset : to_remove) {
// Skip MemRowSet & DuplicatingRowSets which don't have metadata.
@@ -1656,11 +1836,13 @@ Status Tablet::FlushMetadata(const RowSetVector&
to_remove,
to_remove_meta.insert(rowset->metadata()->id());
}
- return metadata_->UpdateAndFlush(to_remove_meta, to_add, mrs_being_flushed);
+ return metadata_->UpdateAndFlush(to_remove_meta, to_add, mrs_being_flushed,
+ txns_being_flushed);
}
Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
- int64_t mrs_being_flushed) {
+ int64_t mrs_being_flushed,
+ const vector<TxnInfoBeingFlushed>&
txns_being_flushed) {
const char *op_name =
(mrs_being_flushed == TabletMetadata::kNoMrsFlushed) ? "Compaction" :
"Flush";
TRACE_EVENT2("tablet", "Tablet::DoMergeCompactionOrFlush",
@@ -1706,7 +1888,8 @@ Status Tablet::DoMergeCompactionOrFlush(const
RowSetsInCompaction &input,
if (drsw.rows_written_count() == 0) {
LOG_WITH_PREFIX(INFO) << op_name << " resulted in no output rows (all
input rows "
<< "were GCed!) Removing all input rowsets.";
- return HandleEmptyCompactionOrFlush(input.rowsets(), mrs_being_flushed);
+ return HandleEmptyCompactionOrFlush(input.rowsets(), mrs_being_flushed,
+ txns_being_flushed);
}
// The RollingDiskRowSet writer wrote out one or more RowSets as the
@@ -1856,7 +2039,8 @@ Status Tablet::DoMergeCompactionOrFlush(const
RowSetsInCompaction &input,
}
// Write out the new Tablet Metadata and remove old rowsets.
- RETURN_NOT_OK_PREPEND(FlushMetadata(input.rowsets(), new_drs_metas,
mrs_being_flushed),
+ RETURN_NOT_OK_PREPEND(FlushMetadata(input.rowsets(), new_drs_metas,
mrs_being_flushed,
+ txns_being_flushed),
"Failed to flush new tablet metadata");
// Now that we've completed the operation, mark any rowsets that have been
@@ -1891,11 +2075,13 @@ Status Tablet::DoMergeCompactionOrFlush(const
RowSetsInCompaction &input,
}
Status Tablet::HandleEmptyCompactionOrFlush(const RowSetVector& rowsets,
- int mrs_being_flushed) {
+ int mrs_being_flushed,
+ const vector<TxnInfoBeingFlushed>&
txns_being_flushed) {
// Write out the new Tablet Metadata and remove old rowsets.
RETURN_NOT_OK_PREPEND(FlushMetadata(rowsets,
RowSetMetadataVector(),
- mrs_being_flushed),
+ mrs_being_flushed,
+ txns_being_flushed),
"Failed to flush new tablet metadata");
AtomicSwapRowSets(rowsets, RowSetVector());
@@ -1943,7 +2129,7 @@ Status Tablet::Compact(CompactFlags flags) {
input.DumpToLog();
}
- return DoMergeCompactionOrFlush(input, TabletMetadata::kNoMrsFlushed);
+ return DoMergeCompactionOrFlush(input, TabletMetadata::kNoMrsFlushed, {});
}
void Tablet::UpdateCompactionStats(MaintenanceOpStats* stats) {
@@ -2006,14 +2192,24 @@ Status Tablet::CaptureConsistentIterators(
// in the middle, we don't modify the output arguments.
vector<IterWithBounds> ret;
-
// Grab the memrowset iterator.
- unique_ptr<RowwiseIterator> ms_iter;
- RETURN_NOT_OK(components_->memrowset->NewRowIterator(opts, &ms_iter));
- IterWithBounds mrs_iwb;
- mrs_iwb.iter = std::move(ms_iter);
- ret.emplace_back(std::move(mrs_iwb));
+ {
+ unique_ptr<RowwiseIterator> ms_iter;
+ RETURN_NOT_OK(components_->memrowset->NewRowIterator(opts, &ms_iter));
+ IterWithBounds mrs_iwb;
+ mrs_iwb.iter = std::move(ms_iter);
+ ret.emplace_back(std::move(mrs_iwb));
+ }
+ // Capture any iterators for memrowsets whose inserts were added as a part of
+ // committed transactions.
+ for (const auto& txn_mrs : components_->txn_memrowsets) {
+ unique_ptr<RowwiseIterator> txn_ms_iter;
+ RETURN_NOT_OK(txn_mrs->NewRowIterator(opts, &txn_ms_iter));
+ IterWithBounds txn_mrs_iwb;
+ txn_mrs_iwb.iter = std::move(txn_ms_iter);
+ ret.emplace_back(std::move(txn_mrs_iwb));
+ }
// Cull row-sets in the case of key-range queries.
if (spec != nullptr && (spec->lower_bound_key() ||
spec->exclusive_upper_bound_key())) {
@@ -2080,6 +2276,10 @@ Status Tablet::CountLiveRows(uint64_t* count) const {
uint64_t ret = 0;
uint64_t tmp = 0;
RETURN_NOT_OK(comps->memrowset->CountLiveRows(&ret));
+ for (const auto& mrs : comps->txn_memrowsets) {
+ RETURN_NOT_OK(mrs->CountLiveRows(&tmp));
+ ret += tmp;
+ }
for (const shared_ptr<RowSet>& rowset : comps->rowsets->all_rowsets()) {
RETURN_NOT_OK(rowset->CountLiveRows(&tmp));
ret += tmp;
@@ -2092,24 +2292,44 @@ size_t Tablet::MemRowSetSize() const {
scoped_refptr<TabletComponents> comps;
GetComponentsOrNull(&comps);
+ size_t ret = 0;
if (comps) {
- return comps->memrowset->memory_footprint();
+ for (const auto& mrs : comps->txn_memrowsets) {
+ ret += mrs->memory_footprint();
+ }
+ ret += comps->memrowset->memory_footprint();
}
- return 0;
+ return ret;
}
bool Tablet::MemRowSetEmpty() const {
scoped_refptr<TabletComponents> comps;
GetComponents(&comps);
- return comps->memrowset->empty();
+ const auto& txn_mrss = comps->txn_memrowsets;
+ return comps->memrowset->empty() && std::all_of(txn_mrss.begin(),
txn_mrss.end(),
+ [] (const shared_ptr<MemRowSet>& mrs) { return mrs->empty(); });
}
size_t Tablet::MemRowSetLogReplaySize(const ReplaySizeMap& replay_size_map)
const {
scoped_refptr<TabletComponents> comps;
GetComponents(&comps);
+ auto min_index = comps->memrowset->MinUnflushedLogIndex();
+ for (const auto& mrs : comps->txn_memrowsets) {
+ const auto& mrs_min_index = mrs->MinUnflushedLogIndex();
+ // If the current min isn't valid, set it.
+ if (min_index == -1) {
+ min_index = mrs_min_index;
+ continue;
+ }
+ // If the transaction MRS's min is valid and lower than the current, valid
+ // min, set it.
+ if (mrs_min_index != -1) {
+ min_index = std::min(mrs_min_index, min_index);
+ }
+ }
- return GetReplaySizeForIndex(comps->memrowset->MinUnflushedLogIndex(),
replay_size_map);
+ return GetReplaySizeForIndex(min_index, replay_size_map);
}
size_t Tablet::OnDiskSize() const {
@@ -2569,7 +2789,7 @@ Status Tablet::DeleteAncientDeletedRowsets() {
return Status::OK();
}
RETURN_NOT_OK(HandleEmptyCompactionOrFlush(
- to_delete, TabletMetadata::kNoMrsFlushed));
+ to_delete, TabletMetadata::kNoMrsFlushed, {}));
metrics_->deleted_rowset_gc_bytes_deleted->IncrementBy(bytes_deleted);
metrics_->deleted_rowset_gc_duration->Increment((MonoTime::Now() -
start_time).ToMilliseconds());
return Status::OK();
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index ce57bfb..bce8157 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -25,7 +25,9 @@
#include <mutex>
#include <ostream>
#include <string>
+#include <unordered_map>
#include <unordered_set>
+#include <utility>
#include <vector>
#include <glog/logging.h>
@@ -89,10 +91,12 @@ class MemRowSet;
class ParticipantOpState;
class RowSetTree;
class RowSetsInCompaction;
+class TxnMetadata;
class WriteOpState;
struct RowOp;
struct TabletComponents;
struct TabletMetrics;
+struct TxnRowSets;
class Tablet {
public:
@@ -117,9 +121,14 @@ class Tablet {
~Tablet();
- // Open the tablet, initializing transactions for 'in_flight_txn_ids'.
+ // Open the tablet, initializing transactions for 'in_flight_txn_ids', and
+ // MRSs for 'txn_ids_with_mrs'. The created MRSs will be uncommitted -- it is
+ // up to the caller to determine whether they should be committed after
+ // finishing bootstrapping.
+ //
// Upon completion, the tablet enters the kBootstrapping state.
- Status Open(const std::unordered_set<int64_t>& in_flight_txn_ids =
std::unordered_set<int64_t>{});
+ Status Open(const std::unordered_set<int64_t>& in_flight_txn_ids =
std::unordered_set<int64_t>{},
+ const std::unordered_set<int64_t>& txn_ids_with_mrs =
std::unordered_set<int64_t>{});
// Mark that the tablet has finished bootstrapping.
// This transitions from kBootstrapping to kOpen state.
@@ -153,6 +162,10 @@ class Tablet {
// This also sets the row op's RowSetKeyProbe.
Status AcquireRowLocks(WriteOpState* op_state);
+ // Acquire a shared lock on the given transaction, to ensure the
+ // transaction's state doesn't change while the given write is in flight.
+ Status AcquireTxnLock(int64_t txn_id, WriteOpState* op_state);
+
// Starts an MVCC op which must have a pre-assigned timestamp.
//
// TODO(todd): rename this to something like "FinishPrepare" or
"StartApply", since
@@ -194,11 +207,18 @@ class Tablet {
// using 'txn' as the anchor owner.
void CommitTransaction(Txn* txn, Timestamp commit_ts, const consensus::OpId&
op_id);
+ // Merges the uncommitted transaction rowsets associated with the given
+ // 'txn_id' with the committed rowsets.
+ void CommitTxnRowSets(int64_t txn_id);
+
// Aborts the transaction, recording the abort in the tablet metadata.
// Upon calling this, 'op_id' will be anchored until the metadata is flushed,
// using 'txn' as the anchor owner.
void AbortTransaction(Txn* txn, const consensus::OpId& op_id);
+ // Creates new rowsets for the given transaction.
+ void CreateTxnRowSets(int64_t txn_id, scoped_refptr<TxnMetadata> txn_meta);
+
// Create a new row iterator which yields the rows as of the current MVCC
// state of this tablet.
// The returned iterator is not initialized.
@@ -521,6 +541,7 @@ class Tablet {
FRIEND_TEST(TestTabletStringKey, TestSplitKeyRangeWithOneRowSet);
FRIEND_TEST(TestTabletStringKey, TestSplitKeyRangeWithNonOverlappingRowSets);
FRIEND_TEST(TestTabletStringKey, TestSplitKeyRangeWithMinimumValueRowSet);
+ FRIEND_TEST(TxnParticipantTest, TestFlushMultipleMRSs);
// Lifecycle states that a Tablet can be in. Legal state transitions for a
// Tablet object:
@@ -638,13 +659,15 @@ class Tablet {
// Performs a merge compaction or a flush.
Status DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
- int64_t mrs_being_flushed);
+ int64_t mrs_being_flushed,
+ const std::vector<TxnInfoBeingFlushed>&
txns_being_flushed);
// Handle the case in which a compaction or flush yielded no output rows.
// In this case, we just need to remove the rowsets in 'rowsets' from the
// metadata and flush it.
Status HandleEmptyCompactionOrFlush(const RowSetVector& rowsets,
- int mrs_being_flushed);
+ int mrs_being_flushed,
+ const std::vector<TxnInfoBeingFlushed>&
txns_being_flushed);
// Updates the average rowset height metric. Acquires the tablet's
// compact_select_lock_.
@@ -652,7 +675,8 @@ class Tablet {
Status FlushMetadata(const RowSetVector& to_remove,
const RowSetMetadataVector& to_add,
- int64_t mrs_being_flushed);
+ int64_t mrs_being_flushed,
+ const std::vector<TxnInfoBeingFlushed>&
txns_being_flushed);
static void ModifyRowSetTree(const RowSetTree& old_tree,
const RowSetVector& rowsets_to_remove,
@@ -679,17 +703,13 @@ class Tablet {
*comps = components_;
}
- // Create a new MemRowSet, replacing the current one.
- // The 'old_ms' pointer will be set to the current MemRowSet set before the
replacement.
- // If the MemRowSet is not empty it will be added to the 'compaction' input
- // and the MemRowSet compaction lock will be taken to prevent the inclusion
- // in any concurrent compactions.
- Status ReplaceMemRowSetUnlocked(RowSetsInCompaction *compaction,
- std::shared_ptr<MemRowSet> *old_ms);
-
- // TODO: Document me.
- Status FlushInternal(const RowSetsInCompaction& input,
- const std::shared_ptr<MemRowSet>& old_ms);
+ // Create a new MemRowSet, replacing the current committed one(s).
+ // 'old_mrss' will be populated to the current committed MemRowSet(s) set
+ // before the replacement. If any MemRowSet is not empty it will be added to
+ // the 'compaction' input and the MemRowSets' compaction locks will be taken
+ // to prevent the inclusion in any concurrent compactions.
+ Status ReplaceMemRowSetsUnlocked(RowSetsInCompaction* compaction,
+ std::vector<std::shared_ptr<MemRowSet>>*
old_mrss);
// Convert the specified read client schema (without IDs) to a server schema
(with IDs)
// This method is used by NewRowIterator().
@@ -751,10 +771,13 @@ class Tablet {
// is active, a writer comes along, then all future short readers will be
blocked.
mutable rw_spinlock component_lock_;
- // The current components of the tablet. These should always be read
- // or swapped under the component_lock.
+ // The components of the tablet whose base data has been committed. These
+ // should always be read or swapped under the component_lock.
scoped_refptr<TabletComponents> components_;
+ // Uncommitted transaction state.
+ std::unordered_map<int64_t, scoped_refptr<TxnRowSets>>
uncommitted_rowsets_by_txn_id_;
+
scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
TabletMemTrackers mem_trackers_;
@@ -900,11 +923,29 @@ class Tablet::Iterator : public RowwiseIterator {
// change.
struct TabletComponents : public RefCountedThreadSafe<TabletComponents> {
TabletComponents(std::shared_ptr<MemRowSet> mrs,
+ std::vector<std::shared_ptr<MemRowSet>> txn_mrss,
std::shared_ptr<RowSetTree> rs_tree);
+ // The "main" MemRowSet that catches inserts to the tablet that are not a
+ // part of any transaction.
const std::shared_ptr<MemRowSet> memrowset;
+
+ // MemRowSets whose insertion heads were inserted as a part of a transaction.
+ const std::vector<std::shared_ptr<MemRowSet>> txn_memrowsets;
+
+ // The persisted RowSets that comprise the rows of a tablet.
const std::shared_ptr<RowSetTree> rowsets;
};
+// Encapsulates data inserted as a part of a transaction that has not yet been
+// committed.
+// TODO(awong): when we support flushing transactional MRSs before committing,
+// track uncommitted disk rowsets here.
+struct TxnRowSets : public RefCountedThreadSafe<TxnRowSets> {
+ explicit TxnRowSets(std::shared_ptr<MemRowSet> mrs)
+ : memrowset(std::move(mrs)) {}
+ const std::shared_ptr<MemRowSet> memrowset;
+};
+
} // namespace tablet
} // namespace kudu
diff --git a/src/kudu/tablet/tablet.proto b/src/kudu/tablet/tablet.proto
index d693633..bb35950 100644
--- a/src/kudu/tablet/tablet.proto
+++ b/src/kudu/tablet/tablet.proto
@@ -35,6 +35,12 @@ message MemStoreTargetPB {
// ... or both of the following fields are set.
optional int64 rs_id = 2 [ default = -1 ];
optional int64 dms_id = 3 [ default = -1 ];
+
+ // If the mutation landed in a rowset whose base data was inserted as a part
+ // of a transaction, the rowset will have a transaction ID associated with
+ // it that differentiates it from rowsets that were not inserted as a part of
+ // a transaction.
+ optional int64 rs_txn_id = 4 [ default = -1 ];
}
// Stores the result of an Insert or Mutate.
diff --git a/src/kudu/tablet/tablet_bootstrap.cc
b/src/kudu/tablet/tablet_bootstrap.cc
index 22e8fb5..55baf55 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -464,6 +464,9 @@ class TabletBootstrap {
std::unordered_set<int64_t> in_flight_txn_ids_;
std::unordered_set<int64_t> terminal_txn_ids_;
+ // Transactions (committed or not) that have active MemRowSets.
+ std::unordered_set<int64_t> mrs_txn_ids_;
+
DISALLOW_COPY_AND_ASSIGN(TabletBootstrap);
};
@@ -612,7 +615,7 @@ Status TabletBootstrap::RunBootstrap(shared_ptr<Tablet>*
rebuilt_tablet,
}
RETURN_NOT_OK(flushed_stores_.InitFrom(*tablet_meta_.get()));
- tablet_meta_->GetTxnIds(&in_flight_txn_ids_, &terminal_txn_ids_);
+ tablet_meta_->GetTxnIds(&in_flight_txn_ids_, &terminal_txn_ids_,
&mrs_txn_ids_);
bool has_blocks;
RETURN_NOT_OK(OpenTablet(&has_blocks));
@@ -672,7 +675,7 @@ Status TabletBootstrap::OpenTablet(bool* has_blocks) {
// doing nothing for now except opening a tablet locally.
{
SCOPED_LOG_SLOW_EXECUTION_PREFIX(INFO, 100, LogPrefix(), "opening tablet");
- RETURN_NOT_OK(tablet->Open(in_flight_txn_ids_));
+ RETURN_NOT_OK(tablet->Open(in_flight_txn_ids_, mrs_txn_ids_));
}
*has_blocks = tablet->num_rowsets() != 0;
tablet_ = std::move(tablet);
@@ -1567,12 +1570,25 @@ Status
TabletBootstrap::PlayTxnParticipantOpRequest(const IOContext* /*io_contex
tablet_->txn_participant(),
&replicate_msg->participant_request());
const auto& op_type = op_state.request()->op().type();
+ const auto& txn_id = op_state.txn_id();
if (ContainsKey(terminal_txn_ids_, op_state.txn_id())) {
+ // Even if we're skipping over this participant op because its metadata
+ // state has been persisted, we still need to commit its rowsets if active,
+ // since we may have had to rebuild a committed MRS.
+ if (op_type == ParticipantOpPB::FINALIZE_COMMIT &&
+ ContainsKey(mrs_txn_ids_, txn_id)) {
+ tablet_->CommitTxnRowSets(txn_id);
+ }
return AppendCommitMsg(commit_msg);
}
bool persisted_in_flight = ContainsKey(in_flight_txn_ids_,
op_state.txn_id());
if ((persisted_in_flight && op_type != ParticipantOpPB::BEGIN_TXN) ||
!persisted_in_flight) {
+ // If we're about to create a new MRS, add this transaction ID to the set
+ // that have active MRSs.
+ if (op_type == ParticipantOpPB::BEGIN_TXN) {
+ EmplaceOrDie(&mrs_txn_ids_, txn_id);
+ }
op_state.mutable_op_id()->CopyFrom(replicate_msg->id());
op_state.set_timestamp(Timestamp(replicate_msg->timestamp()));
op_state.AcquireTxnAndLock();
@@ -1718,6 +1734,14 @@ Status TabletBootstrap::FilterOperation(const
OperationResultPB& op_result,
// output targets was active.
int num_active_stores = 0;
for (const MemStoreTargetPB& mutated_store : op_result.mutated_stores()) {
+ if (mutated_store.has_rs_txn_id()) {
+ // TODO(awong): once we begin flushing before commit, we'll need to have
+ // this account for a last_flushed_mrs_id per transaction.
+ if (ContainsKey(mrs_txn_ids_, mutated_store.rs_txn_id())) {
+ num_active_stores++;
+ }
+ continue;
+ }
if (flushed_stores_.IsMemStoreActive(mutated_store)) {
num_active_stores++;
}
diff --git a/src/kudu/tablet/tablet_metadata.cc
b/src/kudu/tablet/tablet_metadata.cc
index 07be998..579fb1b 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -510,7 +510,8 @@ Status TabletMetadata::LoadFromSuperBlock(const
TabletSuperBlockPB& superblock)
boost::none,
txn_meta.has_commit_timestamp() ?
boost::make_optional(Timestamp(txn_meta.commit_timestamp()))
:
- boost::none
+ boost::none,
+ txn_meta.has_flushed_committed_mrs() &&
txn_meta.flushed_committed_mrs()
));
}
txn_metadata_by_txn_id_ = std::move(txn_metas);
@@ -527,10 +528,11 @@ Status TabletMetadata::LoadFromSuperBlock(const
TabletSuperBlockPB& superblock)
Status TabletMetadata::UpdateAndFlush(const RowSetMetadataIds& to_remove,
const RowSetMetadataVector& to_add,
- int64_t last_durable_mrs_id) {
+ int64_t last_durable_mrs_id,
+ const vector<TxnInfoBeingFlushed>&
txns_being_flushed) {
{
std::lock_guard<LockType> l(data_lock_);
- RETURN_NOT_OK(UpdateUnlocked(to_remove, to_add, last_durable_mrs_id));
+ RETURN_NOT_OK(UpdateUnlocked(to_remove, to_add, last_durable_mrs_id,
txns_being_flushed));
}
return Flush();
}
@@ -642,13 +644,18 @@ Status TabletMetadata::Flush() {
Status TabletMetadata::UpdateUnlocked(
const RowSetMetadataIds& to_remove,
const RowSetMetadataVector& to_add,
- int64_t last_durable_mrs_id) {
+ int64_t last_durable_mrs_id,
+ const vector<TxnInfoBeingFlushed>& txns_being_flushed) {
DCHECK(data_lock_.is_locked());
CHECK_NE(state_, kNotLoadedYet);
if (last_durable_mrs_id != kNoMrsFlushed) {
DCHECK_GE(last_durable_mrs_id, last_durable_mrs_id_);
last_durable_mrs_id_ = last_durable_mrs_id;
}
+ for (const auto& txn_id : txns_being_flushed) {
+ auto txn_meta = FindOrDie(txn_metadata_by_txn_id_, txn_id);
+ txn_meta->set_flushed_committed_mrs_unlocked();
+ }
RowSetMetadataVector new_rowsets = rowsets_;
auto it = new_rowsets.begin();
@@ -752,7 +759,10 @@ Status
TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block,
if (txn_meta->aborted()) {
meta_pb.set_aborted(true);
}
- InsertOrDie(pb.mutable_txn_metadata(), txn_id_and_metadata.first,
std::move(meta_pb));
+ if (txn_meta->flushed_committed_mrs_unlocked()) {
+ meta_pb.set_flushed_committed_mrs(true);
+ }
+ InsertOrDie(pb.mutable_txn_metadata(), txn_id_and_metadata.first, meta_pb);
}
DCHECK(schema_->has_column_ids());
@@ -846,24 +856,37 @@ bool TabletMetadata::HasTxnMetadata(int64_t txn_id) {
}
void TabletMetadata::GetTxnIds(unordered_set<int64_t>* in_flight_txn_ids,
- unordered_set<int64_t>* terminal_txn_ids) {
+ unordered_set<int64_t>* terminal_txn_ids,
+ unordered_set<int64_t>* txn_ids_with_mrs) {
std::unordered_set<int64_t> in_flights;
std::unordered_set<int64_t> terminals;
+ std::unordered_set<int64_t> needs_mrs;
std::lock_guard<LockType> l(data_lock_);
for (const auto& txn_id_and_metadata : txn_metadata_by_txn_id_) {
+ const auto& txn_id = txn_id_and_metadata.first;
const auto& txn_meta = txn_id_and_metadata.second;
if (txn_meta->commit_timestamp() || txn_meta->aborted()) {
if (terminal_txn_ids) {
- EmplaceOrDie(&terminals, txn_id_and_metadata.first);
+ EmplaceOrDie(&terminals, txn_id);
}
} else {
- EmplaceOrDie(&in_flights, txn_id_and_metadata.first);
+ EmplaceOrDie(&in_flights, txn_id);
+ }
+ // If we have not flushed the MRS after committing, the bootstrap process
+ // will need to create an MRS for it, even if the transaction is committed.
+ if (txn_ids_with_mrs &&
+ !txn_meta->flushed_committed_mrs_unlocked() &&
+ !txn_meta->aborted()) {
+ EmplaceOrDie(&needs_mrs, txn_id);
}
}
*in_flight_txn_ids = std::move(in_flights);
if (terminal_txn_ids) {
*terminal_txn_ids = std::move(terminals);
}
+ if (txn_ids_with_mrs) {
+ *txn_ids_with_mrs = std::move(needs_mrs);
+ }
}
unordered_map<int64_t, scoped_refptr<TxnMetadata>>
TabletMetadata::GetTxnMetadata() const {
diff --git a/src/kudu/tablet/tablet_metadata.h
b/src/kudu/tablet/tablet_metadata.h
index 6bba726..25c96fe 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -62,6 +62,12 @@ class TxnMetadata;
typedef std::vector<std::shared_ptr<RowSetMetadata> > RowSetMetadataVector;
typedef std::unordered_set<int64_t> RowSetMetadataIds;
+// A transaction ID whose MRS is being flushed.
+// TODO(awong): If we begin supporting flushing before committing, we should
+// extend this to include the MRS ID so we can define some
+// 'last_durable_mrs_id' per transaction.
+typedef int64_t TxnInfoBeingFlushed;
+
extern const int64_t kNoDurableMemStore;
// Manages the "blocks tracking" for the specified tablet.
@@ -190,7 +196,8 @@ class TabletMetadata : public
RefCountedThreadSafe<TabletMetadata> {
static const int64_t kNoMrsFlushed = -1;
Status UpdateAndFlush(const RowSetMetadataIds& to_remove,
const RowSetMetadataVector& to_add,
- int64_t last_durable_mrs_id);
+ int64_t last_durable_mrs_id,
+ const std::vector<TxnInfoBeingFlushed>&
txns_being_flushed = {});
// Adds the blocks referenced by 'block_ids' to 'orphaned_blocks_'.
//
@@ -266,10 +273,11 @@ class TabletMetadata : public
RefCountedThreadSafe<TabletMetadata> {
// Returns whether a given transaction has metadata.
bool HasTxnMetadata(int64_t txn_id);
- // Returns the transaction IDs that were persisted as being in-flight or
- // terminal.
+ // Returns the transaction IDs that were persisted as being in-flight,
+ // terminal (committed or aborted), and having un-flushed MRSs.
void GetTxnIds(std::unordered_set<int64_t>* in_flight_txn_ids,
- std::unordered_set<int64_t>* terminal_txn_ids = nullptr);
+ std::unordered_set<int64_t>* terminal_txn_ids = nullptr,
+ std::unordered_set<int64_t>* txn_ids_with_mrs = nullptr);
const RowSetMetadataVector& rowsets() const { return rowsets_; }
@@ -371,7 +379,8 @@ class TabletMetadata : public
RefCountedThreadSafe<TabletMetadata> {
// Requires 'data_lock_'.
Status UpdateUnlocked(const RowSetMetadataIds& to_remove,
const RowSetMetadataVector& to_add,
- int64_t last_durable_mrs_id);
+ int64_t last_durable_mrs_id,
+ const std::vector<TxnInfoBeingFlushed>&
txns_being_flushed);
// Requires 'data_lock_'.
Status ToSuperBlockUnlocked(TabletSuperBlockPB* super_block,
diff --git a/src/kudu/tablet/tablet_replica-test-base.cc
b/src/kudu/tablet/tablet_replica-test-base.cc
index d7a7560..5bb89e2 100644
--- a/src/kudu/tablet/tablet_replica-test-base.cc
+++ b/src/kudu/tablet/tablet_replica-test-base.cc
@@ -95,6 +95,9 @@ Status TabletReplicaTestBase::ExecuteWrite(TabletReplica*
replica, const WriteRe
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
+ if (resp.per_row_errors_size() > 0) {
+ return StatusFromPB(resp.per_row_errors(0).error());
+ }
return Status::OK();
}
diff --git a/src/kudu/tablet/txn_metadata.h b/src/kudu/tablet/txn_metadata.h
index 177495e..b250181 100644
--- a/src/kudu/tablet/txn_metadata.h
+++ b/src/kudu/tablet/txn_metadata.h
@@ -32,10 +32,23 @@ class TxnMetadata : public
RefCountedThreadSafe<TxnMetadata> {
public:
explicit TxnMetadata(bool aborted = false,
boost::optional<Timestamp> commit_mvcc_op_timestamp =
boost::none,
- boost::optional<Timestamp> commit_ts = boost::none)
+ boost::optional<Timestamp> commit_ts = boost::none,
+ bool flushed_committed_mrs = false)
: aborted_(aborted),
commit_mvcc_op_timestamp_(std::move(commit_mvcc_op_timestamp)),
- commit_timestamp_(std::move(commit_ts)) {}
+ commit_timestamp_(std::move(commit_ts)),
+ flushed_committed_mrs_(flushed_committed_mrs) {}
+
+ // NOTE: access to 'flushed_committed_mrs_' is not inherently threadsafe --
+ // it is expected that the caller will ensure thread safety (e.g.
+ // TabletMetadata only calls this with its flush lock held).
+ void set_flushed_committed_mrs_unlocked() {
+ flushed_committed_mrs_ = true;
+ }
+ bool flushed_committed_mrs_unlocked() const {
+ return flushed_committed_mrs_;
+ }
+
void set_aborted() {
std::lock_guard<simple_spinlock> l(lock_);
CHECK(boost::none == commit_timestamp_);
@@ -87,6 +100,8 @@ class TxnMetadata : public RefCountedThreadSafe<TxnMetadata>
{
boost::optional<Timestamp> commit_mvcc_op_timestamp_;
boost::optional<Timestamp> commit_timestamp_;
+
+ bool flushed_committed_mrs_;
};
} // namespace tablet
diff --git a/src/kudu/tablet/txn_participant-test.cc
b/src/kudu/tablet/txn_participant-test.cc
index b1229fb..868b0a4 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -33,10 +33,13 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include "kudu/clock/clock.h"
#include "kudu/common/common.pb.h"
+#include "kudu/common/iterator.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/schema.h"
+#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
@@ -50,8 +53,10 @@
#include "kudu/tablet/ops/op_driver.h"
#include "kudu/tablet/ops/op_tracker.h"
#include "kudu/tablet/ops/participant_op.h"
+#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metadata.h"
+#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tablet/tablet_replica-test-base.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/txn_metadata.h"
@@ -59,6 +64,7 @@
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/util/countdown_latch.h"
+#include "kudu/util/metrics.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
@@ -71,6 +77,7 @@ using kudu::tserver::ParticipantResponsePB;
using kudu::tserver::ParticipantOpPB;
using kudu::tserver::WriteRequestPB;
using std::map;
+using std::string;
using std::thread;
using std::unique_ptr;
using std::vector;
@@ -83,8 +90,14 @@ namespace kudu {
namespace tablet {
namespace {
+
+constexpr const int64_t kTxnId = 1;
+
+constexpr const int64_t kTxnOne = 1;
+constexpr const int64_t kTxnTwo = 2;
+
Schema GetTestSchema() {
- return Schema({ ColumnSchema("key", INT32) }, 1);
+ return Schema({ ColumnSchema("key", INT32), ColumnSchema("val", INT32) }, 1);
}
// A participant op that waits to start and finish applying based on input
@@ -129,18 +142,40 @@ class TxnParticipantTest : public TabletReplicaTestBase {
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
}
- Status Write(int key) {
+ Status Write(int key, boost::optional<int64_t> txn_id = boost::none,
+ RowOperationsPB::Type type = RowOperationsPB::INSERT) {
WriteRequestPB req;
+ if (txn_id) {
+ req.set_txn_id(*txn_id);
+ }
req.set_tablet_id(tablet_replica_->tablet_id());
const auto& schema = GetTestSchema();
RETURN_NOT_OK(SchemaToPB(schema, req.mutable_schema()));
KuduPartialRow row(&schema);
RETURN_NOT_OK(row.SetInt32(0, key));
+ if (type != RowOperationsPB::DELETE &&
+ type != RowOperationsPB::DELETE_IGNORE) {
+ RETURN_NOT_OK(row.SetInt32(1, key));
+ }
RowOperationsPBEncoder enc(req.mutable_row_operations());
- enc.Add(RowOperationsPB::INSERT, row);
+ enc.Add(type, row);
return ExecuteWrite(tablet_replica_.get(), req);
}
+ Status Delete(int key) {
+ return Write(key, boost::none, RowOperationsPB::DELETE);
+ }
+
+ Status CallParticipantOpCheckResp(int64_t txn_id,
ParticipantOpPB::ParticipantOpType op_type,
+ int64_t ts_val) {
+ ParticipantResponsePB resp;
+ RETURN_NOT_OK(CallParticipantOp(tablet_replica_.get(), txn_id, op_type,
ts_val, &resp));
+ if (resp.has_error()) {
+ return StatusFromPB(resp.error().status());
+ }
+ return Status::OK();
+ }
+
// Writes an op to the WAL, rolls over onto a new WAL segment, and flushes
// the MRS, leaving us with a new WAL segment that should be GC-able unless
// previous WAL segments are anchored.
@@ -154,6 +189,20 @@ class TxnParticipantTest : public TabletReplicaTestBase {
TxnParticipant* txn_participant() {
return tablet_replica_->tablet()->txn_participant();
}
+
+ Status IterateToStrings(vector<string>* ret) {
+ unique_ptr<RowwiseIterator> iter;
+ RETURN_NOT_OK(tablet_replica_->tablet()->NewRowIterator(GetTestSchema(),
&iter));
+ RETURN_NOT_OK(iter->Init(nullptr));
+ vector<string> out;
+ RETURN_NOT_OK(IterateToStringList(iter.get(), &out));
+ *ret = std::move(out);
+ return Status::OK();
+ }
+
+ clock::Clock* clock() {
+ return tablet_replica_->tablet()->clock();
+ }
};
TEST_F(TxnParticipantTest, TestSuccessfulSequences) {
@@ -218,7 +267,6 @@ TEST_F(TxnParticipantTest, TestTransactionNotFound) {
}
TEST_F(TxnParticipantTest, TestIllegalTransitions) {
- const int64_t kTxnId = 1;
const auto check_valid_op = [&] (const ParticipantOpPB::ParticipantOpType&
type, int64_t txn_id) {
ParticipantResponsePB resp;
ASSERT_OK(CallParticipantOp(
@@ -308,7 +356,6 @@ TEST_F(TxnParticipantTest, TestConcurrentTransactions) {
// Concurrently try to apply every op and test, based on the results, that some
// invariants are maintained.
TEST_F(TxnParticipantTest, TestConcurrentOps) {
- const int64_t kTxnId = 1;
const map<ParticipantOpPB::ParticipantOpType, int> kIndexByOps = {
{ ParticipantOpPB::BEGIN_TXN, 0 },
{ ParticipantOpPB::BEGIN_COMMIT, 1},
@@ -370,7 +417,6 @@ TEST_F(TxnParticipantTest, TestConcurrentOps) {
}
TEST_F(TxnParticipantTest, TestReplayParticipantOps) {
- constexpr const int64_t kTxnId = 1;
for (const auto& type : kCommitSequence) {
ParticipantResponsePB resp;
ASSERT_OK(CallParticipantOp(
@@ -397,9 +443,7 @@ TEST_F(TxnParticipantTest, TestAllOpsRegisterAnchors) {
const auto check_participant_ops_are_anchored =
[&] (int64_t txn_id, const vector<ParticipantOpPB::ParticipantOpType>&
ops) {
for (const auto& op : ops) {
- ParticipantResponsePB resp;
- ASSERT_OK(CallParticipantOp(tablet_replica_.get(), txn_id, op,
- kDummyCommitTimestamp, &resp));
+ ASSERT_OK(CallParticipantOpCheckResp(txn_id, op,
kDummyCommitTimestamp));
ASSERT_OK(tablet_replica_->tablet_metadata()->Flush());
expected_index++;
if (op == ParticipantOpPB::BEGIN_COMMIT) {
@@ -429,7 +473,6 @@ TEST_F(TxnParticipantTest, TestAllOpsRegisterAnchors) {
// restarts, and that the appropriate anchors are in place as we progress
// through a transaction's life cycle.
TEST_F(TxnParticipantTest, TestTxnMetadataSurvivesRestart) {
- const int64_t kTxnId = 1;
// First, do a sanity check that there's nothing GCable.
int64_t gcable_size;
ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
@@ -437,10 +480,8 @@ TEST_F(TxnParticipantTest, TestTxnMetadataSurvivesRestart)
{
// Perform some initial participant ops and roll the WAL segments so there
// are some candidates for WAL GC.
- ParticipantResponsePB resp;
- ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId,
ParticipantOpPB::BEGIN_TXN,
- kDummyCommitTimestamp, &resp));
- ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN,
+ kDummyCommitTimestamp));
ASSERT_OK(tablet_replica_->tablet_metadata()->Flush());
ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed());
ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests());
@@ -464,9 +505,8 @@ TEST_F(TxnParticipantTest, TestTxnMetadataSurvivesRestart) {
// Write and flush a BEGIN_COMMIT op. Once we GC, our WAL will start on this
// op, and WALs should be anchored until the commit is finalized, regardless
// of whether there are more segments.
- ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId,
ParticipantOpPB::BEGIN_COMMIT,
- kDummyCommitTimestamp, &resp));
- ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT,
+ kDummyCommitTimestamp));
ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed());
ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests());
// There should be two anchors for this op: one that is in place until the
@@ -494,9 +534,8 @@ TEST_F(TxnParticipantTest, TestTxnMetadataSurvivesRestart) {
// Once we finalize the commit, the BEGIN_COMMIT anchor should be released.
ASSERT_EQ(1,
tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
- ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId,
ParticipantOpPB::FINALIZE_COMMIT,
- kDummyCommitTimestamp, &resp));
- ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId,
ParticipantOpPB::FINALIZE_COMMIT,
+ kDummyCommitTimestamp));
ASSERT_EQ(1,
tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
ASSERT_OK(tablet_replica_->tablet_metadata()->Flush());
ASSERT_EQ(0,
tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
@@ -523,18 +562,13 @@ TEST_F(TxnParticipantTest,
TestTxnMetadataSurvivesRestart) {
// Test that we can replay BEGIN_COMMIT ops, given it anchors WALs until
// metadata flush _and_ until the transaction is finalized or aborted.
TEST_F(TxnParticipantTest, TestBeginCommitAnchorsOnFlush) {
- const int64_t kTxnId = 1;
- ParticipantResponsePB resp;
// Start a transaction and begin committing.
- ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId,
ParticipantOpPB::BEGIN_TXN,
- kDummyCommitTimestamp, &resp));
- ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN,
kDummyCommitTimestamp));
ASSERT_OK(tablet_replica_->tablet_metadata()->Flush());
auto txn_meta =
FindOrDie(tablet_replica_->tablet_metadata()->GetTxnMetadata(), kTxnId);
ASSERT_EQ(boost::none, txn_meta->commit_mvcc_op_timestamp());
- ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId,
ParticipantOpPB::BEGIN_COMMIT,
- kDummyCommitTimestamp, &resp));
- ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT,
+ kDummyCommitTimestamp));
// We should have two anchors: one that lasts until we flush, another that
// lasts until we finalize the commit.
ASSERT_EQ(2,
tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
@@ -554,9 +588,8 @@ TEST_F(TxnParticipantTest, TestBeginCommitAnchorsOnFlush) {
ASSERT_EQ(1,
tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
ASSERT_OK(RestartReplica(/*reset_tablet*/true));
ASSERT_EQ(1,
tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
- ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId,
ParticipantOpPB::FINALIZE_COMMIT,
- kDummyCommitTimestamp, &resp));
- ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId,
ParticipantOpPB::FINALIZE_COMMIT,
+ kDummyCommitTimestamp));
// The anchor from the BEGIN_COMMIT op should be gone, but we should have
// another anchor for the FINALIZE_COMMIT op until we flush the metadata.
@@ -576,17 +609,12 @@ TEST_F(TxnParticipantTest, TestBeginCommitAnchorsOnFlush)
{
// Like the above test but finalizing the commit before flushing the metadata.
TEST_F(TxnParticipantTest, TestBeginCommitAnchorsOnFinalize) {
- const int64_t kTxnId = 1;
- ParticipantResponsePB resp;
- ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId,
ParticipantOpPB::BEGIN_TXN,
- kDummyCommitTimestamp, &resp));
- ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN,
kDummyCommitTimestamp));
auto txn_meta =
FindOrDie(tablet_replica_->tablet_metadata()->GetTxnMetadata(), kTxnId);
ASSERT_EQ(boost::none, txn_meta->commit_mvcc_op_timestamp());
ASSERT_OK(tablet_replica_->tablet_metadata()->Flush());
- ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId,
ParticipantOpPB::BEGIN_COMMIT,
- kDummyCommitTimestamp, &resp));
- ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT,
+ kDummyCommitTimestamp));
ASSERT_NE(boost::none, txn_meta->commit_mvcc_op_timestamp());
const auto orig_mvcc_op_timestamp = *txn_meta->commit_mvcc_op_timestamp();
@@ -600,9 +628,8 @@ TEST_F(TxnParticipantTest,
TestBeginCommitAnchorsOnFinalize) {
// We should have two anchors, one that lasts until we flush, another that
// lasts until we finalize.
ASSERT_EQ(2,
tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
- ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId,
- ParticipantOpPB::FINALIZE_COMMIT,
kDummyCommitTimestamp, &resp));
- ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId,
ParticipantOpPB::FINALIZE_COMMIT,
+ kDummyCommitTimestamp));
// Finalizing the commit shouldn't affect our metadata.
txn_meta.reset();
@@ -624,11 +651,7 @@ class MetadataFlushTxnParticipantTest : public
TxnParticipantTest,
// Test rebuilding transaction state from the WALs and metadata.
TEST_P(MetadataFlushTxnParticipantTest, TestRebuildTxnMetadata) {
const bool should_flush = GetParam();
- const int64_t kTxnId = 1;
- ParticipantResponsePB resp;
- ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId,
ParticipantOpPB::BEGIN_TXN,
- kDummyCommitTimestamp, &resp));
- ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN,
kDummyCommitTimestamp));
if (should_flush) {
ASSERT_OK(tablet_replica_->tablet_metadata()->Flush());
}
@@ -637,9 +660,8 @@ TEST_P(MetadataFlushTxnParticipantTest,
TestRebuildTxnMetadata) {
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, Txn::kOpen, -1 }
}), txn_participant()->GetTxnsForTests());
- ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId,
ParticipantOpPB::BEGIN_COMMIT,
- kDummyCommitTimestamp, &resp));
- ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT,
+ kDummyCommitTimestamp));
if (should_flush) {
ASSERT_OK(tablet_replica_->tablet_metadata()->Flush());
}
@@ -648,9 +670,8 @@ TEST_P(MetadataFlushTxnParticipantTest,
TestRebuildTxnMetadata) {
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, Txn::kCommitInProgress, -1 }
}), txn_participant()->GetTxnsForTests());
- ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId,
ParticipantOpPB::FINALIZE_COMMIT,
- kDummyCommitTimestamp, &resp));
- ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId,
ParticipantOpPB::FINALIZE_COMMIT,
+ kDummyCommitTimestamp));
if (should_flush) {
ASSERT_OK(tablet_replica_->tablet_metadata()->Flush());
}
@@ -662,9 +683,8 @@ TEST_P(MetadataFlushTxnParticipantTest,
TestRebuildTxnMetadata) {
// Now perform the same validation but for a transaction that gets aborted.
const int64_t kAbortedTxnId = 2;
- ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kAbortedTxnId,
ParticipantOpPB::BEGIN_TXN,
- kDummyCommitTimestamp, &resp));
- ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(CallParticipantOpCheckResp(kAbortedTxnId,
ParticipantOpPB::BEGIN_TXN,
+ kDummyCommitTimestamp));
if (should_flush) {
ASSERT_OK(tablet_replica_->tablet_metadata()->Flush());
}
@@ -673,9 +693,8 @@ TEST_P(MetadataFlushTxnParticipantTest,
TestRebuildTxnMetadata) {
{ kTxnId, Txn::kCommitted, kDummyCommitTimestamp },
{ kAbortedTxnId, Txn::kOpen, -1 }
}), txn_participant()->GetTxnsForTests());
- ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kAbortedTxnId,
ParticipantOpPB::ABORT_TXN,
- kDummyCommitTimestamp, &resp));
- ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(CallParticipantOpCheckResp(kAbortedTxnId,
ParticipantOpPB::ABORT_TXN,
+ kDummyCommitTimestamp));
if (should_flush) {
ASSERT_OK(tablet_replica_->tablet_metadata()->Flush());
}
@@ -685,12 +704,83 @@ TEST_P(MetadataFlushTxnParticipantTest,
TestRebuildTxnMetadata) {
{ kAbortedTxnId, Txn::kAborted, -1 }
}), txn_participant()->GetTxnsForTests());
}
+
+// Test rebuilding transaction state, including writes, from WALs and metadata.
+TEST_P(MetadataFlushTxnParticipantTest, TestReplayTransactionalInserts) {
+ const bool should_flush = GetParam();
+ constexpr const int64_t kAbortedTxnId = 2;
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kAbortedTxnId,
ParticipantOpPB::BEGIN_TXN, -1));
+ ASSERT_OK(Write(0, kTxnId));
+ ASSERT_OK(Write(0, kAbortedTxnId));
+ if (should_flush) {
+ ASSERT_OK(tablet_replica_->tablet_metadata()->Flush());
+ }
+
+ // As long as we haven't finalized the transaction, we shouldn't be able to
+ // iterate through its mutations, even across restarts.
+ vector<string> rows;
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+ ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kAbortedTxnId,
ParticipantOpPB::BEGIN_COMMIT, -1));
+ if (should_flush) {
+ ASSERT_OK(tablet_replica_->tablet_metadata()->Flush());
+ }
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+ ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+ ASSERT_OK(CallParticipantOpCheckResp(kAbortedTxnId,
ParticipantOpPB::ABORT_TXN,
+ clock()->Now().value()));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+
+ // Once we committed the transaction, we should see the rows.
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ if (should_flush) {
+ ASSERT_OK(tablet_replica_->tablet_metadata()->Flush());
+ }
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(1, rows.size());
+ ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(1, rows.size());
+}
+
+// Test replaying mutations to transactional MRSs.
+TEST_P(MetadataFlushTxnParticipantTest, TestReplayUpdatesToTransactionalMRS) {
+ const bool should_flush = GetParam();
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN,
-1));
+ ASSERT_OK(Write(0, kTxnId));
+ ASSERT_OK(Write(1, kTxnId));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ if (should_flush) {
+ ASSERT_OK(tablet_replica_->tablet_metadata()->Flush());
+ }
+ ASSERT_OK(Delete(0));
+ vector<string> rows;
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(1, rows.size());
+
+ ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(1, rows.size());
+}
+
INSTANTIATE_TEST_CASE_P(ShouldFlushMetadata, MetadataFlushTxnParticipantTest,
::testing::Values(true, false));
// Similar to the above test, but checking that in-flight ops anchor the WALs.
TEST_F(TxnParticipantTest, TestActiveParticipantOpsAnchorWALs) {
- const int64_t kTxnId = 1;
ParticipantRequestPB req;
ParticipantResponsePB resp;
auto op_state = NewParticipantOp(tablet_replica_.get(), kTxnId,
ParticipantOpPB::BEGIN_TXN,
@@ -700,6 +790,7 @@ TEST_F(TxnParticipantTest,
TestActiveParticipantOpsAnchorWALs) {
CountDownLatch apply_continue(1);
op_state->set_completion_callback(std::unique_ptr<OpCompletionCallback>(
new LatchOpCompletionCallback<tserver::ParticipantResponsePB>(&latch,
&resp)));
+
scoped_refptr<OpDriver> driver;
unique_ptr<DelayedParticipantOp> op(
new DelayedParticipantOp(&apply_start, &apply_continue,
std::move(op_state)));
@@ -735,6 +826,7 @@ TEST_F(TxnParticipantTest,
TestActiveParticipantOpsAnchorWALs) {
// Add some segments to ensure there are enough segments to GC.
ASSERT_OK(WriteRolloverAndFlush(current_key++));
ASSERT_OK(WriteRolloverAndFlush(current_key++));
+ ASSERT_EQ(0, gcable_size);
ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
ASSERT_GT(gcable_size, 0);
@@ -751,5 +843,510 @@ TEST_F(TxnParticipantTest,
TestActiveParticipantOpsAnchorWALs) {
}), txn_participant()->GetTxnsForTests());
}
+// Test that we can only write to transactions if they are open.
+TEST_F(TxnParticipantTest, TestWriteToOpenTransactionsOnly) {
+ constexpr const int64_t kAbortedTxnId = 2;
+ Status s = Write(0, kTxnId);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN,
-1));
+ ASSERT_OK(Write(0, kTxnId));
+
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ // Even if the row already exists, we shouldn't get an AlreadyPresent error;
+ // the transaction's state is checked much earlier than the presence check.
+ s = Write(0, kTxnId);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ s = Write(1, kTxnId);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId,
ParticipantOpPB::FINALIZE_COMMIT,
+ kDummyCommitTimestamp));
+ s = Write(0, kTxnId);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ s = Write(1, kTxnId);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+
+ ASSERT_OK(CallParticipantOpCheckResp(kAbortedTxnId,
ParticipantOpPB::BEGIN_TXN, -1));
+ ASSERT_OK(Write(2, kAbortedTxnId));
+ ASSERT_OK(CallParticipantOpCheckResp(kAbortedTxnId,
ParticipantOpPB::ABORT_TXN, -1));
+ s = Write(2, kAbortedTxnId);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ s = Write(3, kAbortedTxnId);
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+}
+
+// Test that we get an appropriate error when attempting transactional ops that
+// are not supported.
+TEST_F(TxnParticipantTest, TestUnsupportedOps) {
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN,
-1));
+ Status s = Write(0, kTxnId, RowOperationsPB::UPSERT);
+ ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+ s = Write(0, kTxnId, RowOperationsPB::UPDATE);
+ ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+ s = Write(0, kTxnId, RowOperationsPB::UPDATE_IGNORE);
+ ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+
+ // None of the ops should have done anything.
+ ASSERT_EQ(0, tablet_replica_->CountLiveRowsNoFail());
+
+ s = Write(0, kTxnId, RowOperationsPB::DELETE);
+ ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+ s = Write(0, kTxnId, RowOperationsPB::DELETE_IGNORE);
+ ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
+}
+
+// Test that rows inserted to transactional stores only show up when the
+// transactions complete.
+TEST_F(TxnParticipantTest, TestInsertToTransactionMRS) {
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN,
-1));
+ ASSERT_OK(Write(0, kTxnOne));
+ ASSERT_OK(Write(1, kTxnTwo));
+ ASSERT_OK(Write(2, kTxnTwo));
+
+ vector<string> rows;
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ // Only after we finalize a transaction's commit should we see its rows.
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(1, rows.size());
+
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(1, rows.size());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(3, rows.size());
+ ASSERT_OK(tablet_replica_->tablet()->Flush());
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(3, rows.size());
+}
+
+// Test that rows inserted to transactional stores don't show up if the
+// transaction is aborted.
+TEST_F(TxnParticipantTest, TestDontReadAbortedInserts) {
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN,
-1));
+ ASSERT_OK(Write(0, kTxnOne));
+ ASSERT_OK(Write(1, kTxnTwo));
+
+ vector<string> rows;
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+
+ // Even if we begin committing, if the transaction is ultimately aborted, we
+ // should see nothing.
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::ABORT_TXN,
-1));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::ABORT_TXN,
-1));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+
+ ASSERT_OK(tablet_replica_->tablet()->Flush());
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+}
+
+// Test that rows inserted as a part of a transaction cannot be updated if the
+// transaction is aborted.
+TEST_F(TxnParticipantTest, TestUpdateAfterAborting) {
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN,
-1));
+ ASSERT_OK(Write(0, kTxnId));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::ABORT_TXN,
-1));
+ Status s = Write(0, boost::none, RowOperationsPB::UPDATE);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_OK(Write(0, boost::none, RowOperationsPB::UPDATE_IGNORE));
+ ASSERT_EQ(1,
tablet_replica_->tablet()->metrics()->update_ignore_errors->value());
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_EQ(0, tablet_replica_->CountLiveRowsNoFail());
+
+ s = Write(0, boost::none, RowOperationsPB::DELETE);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_OK(Write(0, boost::none, RowOperationsPB::DELETE_IGNORE));
+ ASSERT_EQ(1,
tablet_replica_->tablet()->metrics()->delete_ignore_errors->value());
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_EQ(0, tablet_replica_->CountLiveRowsNoFail());
+
+ ASSERT_OK(Write(0, boost::none, RowOperationsPB::UPSERT));
+ ASSERT_EQ(1, tablet_replica_->CountLiveRowsNoFail());
+}
+
+// Test that we can update rows that were inserted and committed as a part of a
+// transaction.
+TEST_F(TxnParticipantTest, TestUpdateCommittedTransactionMRS) {
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN,
-1));
+ ASSERT_OK(Write(0, kTxnId));
+
+ // Since we haven't committed yet, we should see no rows.
+ vector<string> rows;
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+ Status s = Delete(0);
+ ASSERT_TRUE(s.IsNotFound());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT,
-1));
+
+ // We still haven't finished committing, so we should see no rows.
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+ s = Delete(0);
+ ASSERT_TRUE(s.IsNotFound());
+
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(1, rows.size());
+
+ // We should be able to update committed, transactional stores.
+ ASSERT_OK(Delete(0));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+
+ // We should be able to re-insert the deleted row, even if to a row written
+ // during a transaction.
+ ASSERT_OK(Write(0));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(1, rows.size());
+
+ ASSERT_OK(tablet_replica_->tablet()->Flush());
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(1, rows.size());
+}
+
+// Test that we can flush multiple MRSs, and that when restarting, ops are
+// replayed (or not) as appropriate.
+TEST_F(TxnParticipantTest, TestFlushMultipleMRSs) {
+ const int kNumTxns = 3;
+ const int kNumRowsPerTxn = 100;
+ vector<string> rows;
+ Tablet* tablet = tablet_replica_->tablet();
+ scoped_refptr<TabletComponents> comps;
+ for (int t = 0; t < kNumTxns; t++) {
+ ASSERT_OK(CallParticipantOpCheckResp(t, ParticipantOpPB::BEGIN_TXN,
kDummyCommitTimestamp));
+
+ // Since we haven't committed anything, the tablet components shouldn't
+ // have any transactional MRSs.
+ tablet->GetComponents(&comps);
+ ASSERT_TRUE(comps->txn_memrowsets.empty());
+ }
+ for (int t = 0; t < kNumTxns; t++) {
+ for (int r = 0; r < kNumRowsPerTxn; r++) {
+ ASSERT_OK(Write(t * kNumRowsPerTxn + r, t));
+ }
+ ASSERT_OK(CallParticipantOpCheckResp(t, ParticipantOpPB::BEGIN_COMMIT,
kDummyCommitTimestamp));
+ ASSERT_OK(CallParticipantOpCheckResp(t, ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ((t + 1) * kNumRowsPerTxn, rows.size());
+ tablet->GetComponents(&comps);
+ ASSERT_EQ(t + 1, comps->txn_memrowsets.size());
+ }
+ // After restarting, we should have the same number of rows and MRSs.
+ ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+ tablet = tablet_replica_->tablet();
+
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(kNumTxns * kNumRowsPerTxn, rows.size());
+ tablet->GetComponents(&comps);
+ ASSERT_EQ(kNumTxns, comps->txn_memrowsets.size());
+
+ // Once flushed, we should have the same number of rows, but no txn MRSs.
+ ASSERT_OK(tablet_replica_->tablet()->Flush());
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(kNumTxns * kNumRowsPerTxn, rows.size());
+ tablet->GetComponents(&comps);
+ ASSERT_TRUE(comps->txn_memrowsets.empty());
+
+ // The verifications should hold after restarting the replica after flushing.
+ ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+ tablet = tablet_replica_->tablet();
+
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(kNumTxns * kNumRowsPerTxn, rows.size());
+ tablet->GetComponents(&comps);
+ ASSERT_TRUE(comps->txn_memrowsets.empty());
+}
+
+// Test that INSERT_IGNORE ops work when the row exists in the transactional
+// MRS.
+TEST_F(TxnParticipantTest, TestInsertIgnoreInTransactionMRS) {
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN,
-1));
+
+ // Insert into a new transactional MRS, and then INSERT_IGNORE as a part of a
+ // transaction.
+ vector<string> rows;
+ ASSERT_OK(Write(0, kTxnId));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_TRUE(rows.empty());
+
+ Status s = Write(0, kTxnId);
+ ASSERT_TRUE(s.IsAlreadyPresent());
+ ASSERT_EQ(0,
tablet_replica_->tablet()->metrics()->insert_ignore_errors->value());
+
+ ASSERT_OK(Write(0, kTxnId, RowOperationsPB::INSERT_IGNORE));
+ ASSERT_EQ(1,
tablet_replica_->tablet()->metrics()->insert_ignore_errors->value());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(1, rows.size());
+}
+
+// Test that INSERT_IGNORE ops work when the row exists in the main MRS.
+TEST_F(TxnParticipantTest, TestInsertIgnoreInMainMRS) {
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN,
-1));
+ // Insert into the main MRS, and then INSERT_IGNORE as a part of a
+ // transaction.
+ vector<string> rows;
+ ASSERT_OK(Write(0));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(1, rows.size());
+
+ Status s = Write(0, kTxnId);
+ ASSERT_TRUE(s.IsAlreadyPresent());
+ ASSERT_EQ(0,
tablet_replica_->tablet()->metrics()->insert_ignore_errors->value());
+
+ ASSERT_OK(Write(0, kTxnId, RowOperationsPB::INSERT_IGNORE));
+ ASSERT_EQ(1,
tablet_replica_->tablet()->metrics()->insert_ignore_errors->value());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(1, rows.size());
+}
+
+// Test that the live row count accounts for transactional MRSs.
+TEST_F(TxnParticipantTest, TestLiveRowCountAccountsForTransactionalMRSs) {
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN,
-1));
+ ASSERT_OK(Write(0));
+ ASSERT_OK(Write(1, kTxnOne));
+ ASSERT_OK(Write(2, kTxnTwo));
+ ASSERT_EQ(1, tablet_replica_->CountLiveRowsNoFail());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ ASSERT_EQ(1, tablet_replica_->CountLiveRowsNoFail());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ ASSERT_EQ(2, tablet_replica_->CountLiveRowsNoFail());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ ASSERT_EQ(2, tablet_replica_->CountLiveRowsNoFail());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ ASSERT_EQ(3, tablet_replica_->CountLiveRowsNoFail());
+ ASSERT_OK(Delete(1));
+ ASSERT_OK(Delete(2));
+ ASSERT_EQ(1, tablet_replica_->CountLiveRowsNoFail());
+}
+
+// Test that the MRS size metrics account for transactional MRSs.
+TEST_F(TxnParticipantTest, TestSizeAccountsForTransactionalMRS) {
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN,
-1));
+ ASSERT_TRUE(tablet_replica_->tablet()->MemRowSetEmpty());
+
+ auto* tablet = tablet_replica_->tablet();
+ auto mrs_size_with_empty = tablet->MemRowSetSize();
+
+ ASSERT_OK(Write(1, kTxnOne));
+ ASSERT_OK(Write(2, kTxnTwo));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ ASSERT_TRUE(tablet->MemRowSetEmpty());
+
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ auto mrs_size_with_one = tablet->MemRowSetSize();
+ ASSERT_GT(mrs_size_with_one, mrs_size_with_empty);
+ ASSERT_FALSE(tablet->MemRowSetEmpty());
+
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ auto mrs_size_with_two = tablet->MemRowSetSize();
+ ASSERT_GT(mrs_size_with_two, mrs_size_with_one);
+
+ // The MRSs shouldn't be considered empty even if their rows are deleted,
+ // since they still contain mutations.
+ ASSERT_OK(Delete(1));
+ ASSERT_OK(Delete(2));
+ ASSERT_FALSE(tablet_replica_->tablet()->MemRowSetEmpty());
+}
+
+// Test that the MRS anchored WALs metric accounts for transactional MRSs.
+TEST_F(TxnParticipantTest, TestWALsAnchoredAccountsForTransactionalMRS) {
+ const auto mrs_wal_size = [&] {
+ map<int64_t, int64_t> replay_size_map;
+ CHECK_OK(tablet_replica_->GetReplaySizeMap(&replay_size_map));
+ return tablet_replica_->tablet()->MemRowSetLogReplaySize(replay_size_map);
+ };
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN,
-1));
+
+ // Write a row and roll over onto a new WAL segment so there are bytes to GC.
+ ASSERT_OK(Write(0, kTxnOne));
+ ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed());
+ ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests());
+
+ // Nothing should be considered anchored, as the transaction hasn't been
+ // committed -- it thus wouldn't make sense to perform maintenance ops based
+ // on WAL segments for the uncommitted write.
+ ASSERT_EQ(0, mrs_wal_size());
+
+ // Once we commit, we should see some GCable bytes.
+ ASSERT_OK(Write(1, kTxnOne));
+ ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed());
+ ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests());
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ auto mrs_wal_size_with_first_committed = mrs_wal_size();
+ ASSERT_GT(mrs_wal_size_with_first_committed, 0);
+
+ ASSERT_OK(Write(2, kTxnTwo));
+ ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed());
+ ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests());
+ auto mrs_wal_size_with_both_written = mrs_wal_size();
+
+ // Despite not having committed the second transaction, we still wrote new
+ // WAL segments, and that's enough to bump the MRS WALs anchored value.
+ ASSERT_GT(mrs_wal_size_with_both_written, mrs_wal_size_with_first_committed);
+
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+
+ auto mrs_wal_size_with_both_committed = mrs_wal_size();
+ ASSERT_EQ(mrs_wal_size_with_both_committed, mrs_wal_size_with_both_written);
+}
+
+// Test racing writes with commits, ensuring that we cease writing once
+// beginning to commit.
+TEST_F(TxnParticipantTest, TestRacingCommitAndWrite) {
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_TXN,
-1));
+ int first_error_row = -1;
+ CountDownLatch first_write(1);
+ thread t([&] {
+ for (int row = 0 ;; row++) {
+ Status s = Write(row, kTxnId);
+ if (!s.ok()) {
+ first_error_row = row;
+ break;
+ }
+ first_write.CountDown();
+ }
+ });
+ first_write.Wait();
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnId,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ t.join();
+ ASSERT_GT(first_error_row, 0);
+ ASSERT_EQ(first_error_row, tablet_replica_->CountLiveRowsNoFail());
+}
+
+// Test that the write metrics account for transactional rowsets.
+TEST_F(TxnParticipantTest, TestMRSLookupsMetric) {
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_TXN,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnTwo, ParticipantOpPB::BEGIN_TXN,
-1));
+
+ // A non-transactional write should not check any MRSs -- it should just be
+ // inserted into the main MRS.
+ ASSERT_OK(Write(0));
+ ASSERT_EQ(0, tablet_replica_->tablet()->metrics()->mrs_lookups->value());
+
+ // A transactional write will check the main MRS before trying to insert to
+ // the transactional MRS.
+ ASSERT_OK(Write(1, kTxnOne));
+ ASSERT_EQ(1, tablet_replica_->tablet()->metrics()->mrs_lookups->value());
+ ASSERT_OK(Write(2, kTxnTwo));
+ ASSERT_EQ(2, tablet_replica_->tablet()->metrics()->mrs_lookups->value());
+
+ // Once a transaction is committed, its MRS and the main MRS will be checked
+ // for new transactional writes.
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne, ParticipantOpPB::BEGIN_COMMIT,
-1));
+ ASSERT_OK(CallParticipantOpCheckResp(kTxnOne,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ ASSERT_OK(Write(3, kTxnTwo));
+ ASSERT_EQ(4, tablet_replica_->tablet()->metrics()->mrs_lookups->value());
+
+ // Non-transactional writes will only check the committed transactional MRS,
+ // before attempting to insert to the main MRS.
+ ASSERT_OK(Write(4));
+ ASSERT_EQ(5, tablet_replica_->tablet()->metrics()->mrs_lookups->value());
+
+ // Trying to delete a row that doesn't exist will consult just the committed
+ // transactional MRS before attempting to delete from the main MRS.
+ Status s = Delete(10);
+ ASSERT_TRUE(s.IsNotFound());
+ ASSERT_EQ(6, tablet_replica_->tablet()->metrics()->mrs_lookups->value());
+
+ // Deleting a row that exists in a MRS, the committed transactional MRS is
+ // checked, and the successful deletion from the MRS increments the lookup
+ // value, regardless of which MRS the row is in.
+ ASSERT_OK(Delete(0));
+ ASSERT_EQ(8, tablet_replica_->tablet()->metrics()->mrs_lookups->value());
+ ASSERT_OK(Delete(1));
+ ASSERT_EQ(10, tablet_replica_->tablet()->metrics()->mrs_lookups->value());
+}
+
+struct ConcurrencyParams {
+ int num_txns;
+ int num_rows_per_thread;
+};
+class TxnParticipantConcurrencyTest : public TxnParticipantTest,
+ public
::testing::WithParamInterface<ConcurrencyParams> {};
+
+// Test inserting into multiple transactions from multiple threads.
+TEST_P(TxnParticipantConcurrencyTest, TestConcurrentDisjointInsertsTxn) {
+ const auto& params = GetParam();
+ const auto& num_txns = params.num_txns;
+ const int kNumThreads = 10;
+ const auto& rows_per_thread = params.num_rows_per_thread;
+ for (int txn_id = 0; txn_id < num_txns; txn_id++) {
+ ASSERT_OK(CallParticipantOpCheckResp(txn_id, ParticipantOpPB::BEGIN_TXN,
-1));
+ }
+ // Insert to multiple transactions concurrently.
+ vector<thread> threads;
+ for (int i = 0; i < kNumThreads; i++) {
+ threads.emplace_back([&, i] {
+ for (int r = 0; r < rows_per_thread; r++) {
+ int row = i * rows_per_thread + r;
+ ASSERT_OK(Write(row, row % num_txns));
+ }
+ });
+ }
+ for (auto& t : threads) {
+ t.join();
+ }
+ vector<string> rows;
+ for (int txn_id = 0; txn_id < num_txns; txn_id++) {
+ ASSERT_OK(CallParticipantOpCheckResp(txn_id,
ParticipantOpPB::BEGIN_COMMIT, -1));
+ }
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(0, rows.size());
+
+ // As we commit our transactions, we should see more and more rows show up.
+ for (int txn_id = 0; txn_id < num_txns; txn_id++) {
+ ASSERT_OK(CallParticipantOpCheckResp(txn_id,
ParticipantOpPB::FINALIZE_COMMIT,
+ clock()->Now().value()));
+ ASSERT_OK(IterateToStrings(&rows));
+ ASSERT_EQ(kNumThreads * rows_per_thread * (txn_id + 1) / num_txns,
rows.size());
+ }
+}
+INSTANTIATE_TEST_CASE_P(ConcurrencyParams, TxnParticipantConcurrencyTest,
+ ::testing::Values(
+ ConcurrencyParams{ /*num_txns*/1, /*num_rows_per_thread*/1 },
+ ConcurrencyParams{ /*num_txns*/10, /*num_rows_per_thread*/1 },
+ ConcurrencyParams{ /*num_txns*/1, /*num_rows_per_thread*/10 }
+ ));
+
} // namespace tablet
} // namespace kudu
diff --git a/src/kudu/tablet/txn_participant.cc
b/src/kudu/tablet/txn_participant.cc
index d498fc9..9e86797 100644
--- a/src/kudu/tablet/txn_participant.cc
+++ b/src/kudu/tablet/txn_participant.cc
@@ -55,6 +55,11 @@ void Txn::AcquireWriteLock(std::unique_lock<rw_semaphore>*
txn_lock) {
*txn_lock = std::move(l);
}
+void Txn::AcquireReadLock(shared_lock<rw_semaphore>* txn_lock) {
+ shared_lock<rw_semaphore> l(state_lock_);
+ *txn_lock = std::move(l);
+}
+
void TxnParticipant::CreateOpenTransaction(int64_t txn_id,
LogAnchorRegistry*
log_anchor_registry) {
std::lock_guard<simple_spinlock> l(lock_);
@@ -70,6 +75,11 @@ scoped_refptr<Txn>
TxnParticipant::GetOrCreateTransaction(int64_t txn_id,
tablet_metadata_);
}
+scoped_refptr<Txn> TxnParticipant::GetTransaction(int64_t txn_id) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return FindPtrOrNull(txns_, txn_id);
+}
+
void TxnParticipant::ClearIfInitFailed(int64_t txn_id) {
std::lock_guard<simple_spinlock> l(lock_);
Txn* txn = FindPointeeOrNull(txns_, txn_id);
diff --git a/src/kudu/tablet/txn_participant.h
b/src/kudu/tablet/txn_participant.h
index 795e712..3cea176 100644
--- a/src/kudu/tablet/txn_participant.h
+++ b/src/kudu/tablet/txn_participant.h
@@ -101,10 +101,11 @@ class Txn : public RefCountedThreadSafe<Txn> {
commit_timestamp_(-1) {}
~Txn();
- // Takes the state lock in write mode and returns it. As transaction state is
- // meant to be driven via an op driver, lock acquisition is expected to be
- // serialized in a single thread.
+ // Takes the state lock and returns it. As transaction state is meant to be
+ // driven via an op driver, lock acquisition is expected to be serialized in
+ // a single thread.
void AcquireWriteLock(std::unique_lock<rw_semaphore>* txn_lock);
+ void AcquireReadLock(shared_lock<rw_semaphore>* txn_lock);
// Validates that the transaction is in the appropriate state to perform the
// given operation. Should be called while holding the state lock before
@@ -272,11 +273,15 @@ class TxnParticipant {
void CreateOpenTransaction(int64_t txn_id,
log::LogAnchorRegistry* log_anchor_registry);
- // Gets the transaction state for the given transaction ID, creating it in
+ // Gets the transaction for the given transaction ID, creating it in
// the kInitializing state if one doesn't already exist.
scoped_refptr<Txn> GetOrCreateTransaction(int64_t txn_id,
log::LogAnchorRegistry*
log_anchor_registry);
+ // Gets the transaction for the given transaction ID, or returns null if it
+ // does not exist.
+ scoped_refptr<Txn> GetTransaction(int64_t txn_id);
+
// Removes the given transaction if it failed to initialize, e.g. the op that
// created it failed to replicate, leaving it in the kInitializing state but
// with no op actively mutating it.
diff --git a/src/kudu/tserver/tablet_service.cc
b/src/kudu/tserver/tablet_service.cc
index 11c1f08..f6118b9 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -3135,10 +3135,11 @@ Status TabletServiceImpl::HandleScanAtSnapshot(const
NewScanRequestPB& scan_pb,
s = time_manager->WaitUntilSafe(tmp_snap_timestamp, final_deadline);
tablet::MvccSnapshot snap;
+ auto* mvcc_manager = tablet->mvcc_manager();
if (PREDICT_TRUE(s.ok())) {
// Wait for the in-flights in the snapshot to be finished.
TRACE("Waiting for operations to commit");
- s = tablet->mvcc_manager()->WaitForSnapshotWithAllApplied(
+ s = mvcc_manager->WaitForSnapshotWithAllApplied(
tmp_snap_timestamp, &snap, client_deadline);
}
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index 9dd51c0..e51d190 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -103,6 +103,9 @@ message TabletServerErrorPB {
// The request is disallowed for the given user.
NOT_AUTHORIZED = 21;
+
+ // The requested transaction is not in the appropriate state.
+ TXN_ILLEGAL_STATE = 22;
}
// The error code.
diff --git a/src/kudu/util/locks.h b/src/kudu/util/locks.h
index f70955c..1ee8544 100644
--- a/src/kudu/util/locks.h
+++ b/src/kudu/util/locks.h
@@ -275,7 +275,12 @@ class shared_lock {
}
void swap(shared_lock& other) {
- std::swap(m_,other.m_);
+ std::swap(m_, other.m_);
+ }
+
+ shared_lock& operator=(shared_lock&& other) noexcept {
+ swap(other);
+ return *this;
}
~shared_lock() {