This is an automated email from the ASF dual-hosted git repository. bankim pushed a commit to branch branch-1.15.x in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 612e2523ea819d677abd8d7e3d935a2fb43118f9 Author: Andrew Wong <[email protected]> AuthorDate: Thu May 27 16:58:52 2021 -0700 [docs] add a design overview for multi-row transactions This adds a design overview for the multi-row transactions feature. The goal is to describe the implementation of various components of the project to give a better high-level understanding of how the feature comes together. Rendered version: https://github.com/andrwng/kudu/blob/txns_docs/docs/design-docs/transactions.adoc Change-Id: I14c5a8cbd2b239c68e355910e9a6de4576508dd6 Reviewed-on: http://gerrit.cloudera.org:8080/17525 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> (cherry picked from commit be80cb291b06616702a599b12534822345f73302) Reviewed-on: http://gerrit.cloudera.org:8080/17536 Reviewed-by: Bankim Bhavsar <[email protected]> --- docs/design-docs/design-docs_transactions.svg | 1 + docs/design-docs/transactions.adoc | 430 ++++++++++++++++++++++++++ 2 files changed, 431 insertions(+) diff --git a/docs/design-docs/design-docs_transactions.svg b/docs/design-docs/design-docs_transactions.svg new file mode 100644 index 0000000..bae44b8 --- /dev/null +++ b/docs/design-docs/design-docs_transactions.svg @@ -0,0 +1 @@ +<svg version="1.1" viewBox="0.0 0.0 1280.0 960.0" fill="none" stroke="none" stroke-linecap="square" stroke-miterlimit="10" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns="http://www.w3.org/2000/svg"><clipPath id="p.0"><path d="m0 0l1280.0 0l0 960.0l-1280.0 0l0 -960.0z" clip-rule="nonzero"/></clipPath><g clip-path="url(#p.0)"><path fill="#000000" fill-opacity="0.0" d="m0 0l1280.0 0l0 960.0l-1280.0 0z" fill-rule="evenodd"/><path fill="#cfe2f3" d="m151.68767 82.45932l135.40157 0l0 61.0708 [...] \ No newline at end of file diff --git a/docs/design-docs/transactions.adoc b/docs/design-docs/transactions.adoc new file mode 100644 index 0000000..3f65085 --- /dev/null +++ b/docs/design-docs/transactions.adoc @@ -0,0 +1,430 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + += Transactions Design Overview + +This document describes the implementation of multi-row transactions in Kudu. An initial design +document was written to provide Kudu-related context and explore possible implementations. That can +be found +link:https://docs.google.com/document/d/1qv7Zejpfzg-HvF5azRL49g5lRLQ4437EmJ53GiupcWQ/edit#[here]. + +Last updated: + +* *1.15.0*: experimental, `INSERT` and `INSERT_IGNORE` ops only. To enable, set + `--txn_manager_enabled` on the Kudu master and `--enable_txn_system_client_init` on Kudu tablet + servers, each with `--unlock_experimental_flags`. By default, transactions require at least three + tablet servers to function to host a system table, but this can be adjusted by setting + `--txn_manager_status_table_num_replicas=1` on the master. + +== Terminology + +Some new terminology added with the introduction of transactions: + +*Transaction participant*: a tablet that has had ops sent to it within a transaction. “Participant” +refers to the logical tablet, in the same way that “tablet” refers to the logical partition of a +table. “Leader participant” and “follower participant” refer to the leader and follower replicas of +a tablet that is a participant in a transaction. A transaction may have many participants. + +*Transaction status table*: a distributed table with a logical schema of (`txn_id`, `commit_state`, +`start_timestamp`, `last_update_timestamp`, `commit_timestamp`, `list_of_participant_tablet_ids`). +This table is used to keep track of what transactions are in flight. + +*`TxnStatusManager`*: an entity that lives in-memory alongside each tablet replica of the +transaction status table. It is in charge of actually writing to the transaction status tablets, and +transitioning the in-memory transaction states through the lifecycle of each transaction. + +*`TxnManager`*: a proxy between the Kudu client and the `TxnStatusManagers` so the user client +doesn’t have to interact directly with the transaction status table internals (e.g. opening the +table, getting the table locations, etc). + +*`TxnSystemClient`*: encapsulates all the logic required for finding and sending RPCs to the +transaction status table, and to participants. This is used by the `TxnManager`, the transaction +status manager, and transaction participants. + +*Transaction handle*: a public-facing, client-side object that interacts with +the `TxnManager` to start, commit, abort, and heartbeat to the `TxnStatusManagers`. + +== A walkthrough of the commit path + +Before delving into the details of each component, below is a depiction of the transactional write +path. It is assumed that the transaction status table has the default replication factor (RF=3). + +image::design-docs_transactions.svg[Commit path] + +. A Kudu client sends a request to the `TxnManager` to begin a transaction. +. The `TxnManager` has cached the highest transaction ID seen by a transaction so far (though this + could also be heartbeated around), and sends a request to the appropriate leader `TxnStatusManager` + to begin a transaction with the next transaction ID. +. If the transaction ID is accepted by the transaction status tablet (i.e. it’s + of an acceptable hash and range), a new status record is inserted to it. +. Upon successfully persisting the new status record in the transaction status table, the + `TxnStatusManager` returns a response to the `TxnManager`, who now knows of a new highest + transaction ID. +. The `TxnManager` returns to the Kudu client with the transaction ID. +. The Kudu client sends writes directly to the tablet servers, but with a transaction ID associated + with every WriteRequestPB. It’s first checked whether or not the transaction participant has been + registered with the `TxnStatusManager`. +. If the transaction ID hasn’t been registered with this participant, the participant uses the + transaction system client to register itself with the appropriate `TxnStatusManager`. The + `TxnStatusManager` writes a participant record that includes the participant’s tablet ID. +. The transaction participant leader replicates the client write to its followers. +. The transaction participant returns to the Kudu client with success. +. When the user wants to commit the transaction, the Kudu client sends a request to the `TxnManager` + to commit the transaction. +. The `TxnManager` sends a request to the `TxnStatusManager` to commit the transaction. +. The `TxnStatusManager` updates in-memory state to block further participants from registering, and + replicates an update to the status record to indicate the transaction has begun committing. +. The `TxnStatusManager` returns an ack to the `TxnManager`, which is returned to the client. +. Asynchronously, the `TxnStatusManager` sends requests to all registered participants to begin + committing. +. Each participant replicates this request, indicating that they will no longer accept new writes + requests for this transaction. Each participant returns to the `TxnStatusManager` the timestamp + replicated for this record, which is guaranteed to be higher than all ops on this participant for + this transaction. +. The `TxnStatusManager` replicates the commit timestamp in the transaction status record. Past this + step, the transaction can no longer be aborted. +. The `TxnStatusManager` sends requests to finalize the commit on all participants. +. This request to finalize the commit is replicated to the replicas of the participants; upon + applying the commit, all state for the transaction is made visible on the participant. +. The `TxnStatusManager` replicates a record that the commit is complete. + +== Transactions system table and `TxnManagers` + +The `TxnManagers` are the clients’ first point of contact when operating on transactions. +`TxnManagers` are entities that currently live on the Kudu master nodes and serve as proxies to the +transaction status table, a distributed system table that stores metadata about every transaction in +the cluster. + +`TxnManagers` are mostly stateless and mostly serve to send requests that update the transaction +status table, with the exception that `TxnManagers` keep track of the highest transaction ID seen so +far, allowing it to make requests to start transactions with a specific transaction ID. In doing so, +users do not have to supply a transaction ID when starting a new transaction, and partitions of the +transaction status table do not need to coordinate among themselves to determine the next +transaction ID. + +// TODO: file ticket about improving transaction ID assignment algorithm + +The `TxnManager` service exposes the following RPC endpoints to clients: + +* `BeginTransaction() => { txn_id, keepalive_millis }`: Starts a transaction in the `OPEN` state +* `CommitTransaction(txn_id) => {}`: Transitions a transaction from `OPEN` to `COMMIT_IN_PROGRESS` +* `AbortTransaction(txn_id) => {}`: Transitions a transaction from `OPEN` or `COMMIT_IN_PROGRESS` to + `ABORT_IN_PROGRESS` +* `GetTransactionState(txn_id) => { state }`: Returns the current state of a transaction +* `KeepTransactionAlive(txn_id) => {}`: Signals to the `TxnStatusManager` that a transaction should + be kept alive + +Each endpoint corresponds to a user-facing client API in the C++ and Java clients, with the +exception of the keep-alive endpoint, as keep-alive heartbeating is handled automatically by +transaction handles. + +Under the hood, the `TxnManager` wraps a KuduClient as the `TxnSystemClient`, which it uses to +lazily create, open, alter the transaction status table, as well as send requests to its partitions. +By default, the table is created only once the first `BeginTransaction()` request is received. The +"lazy" initialization of the transactions status table can be controlled via the +`--txn_manager_lazily_initialized` master flag; when disabled, the `TxnManagers` will attempt to +create the table periodically upon starting up. + +The transaction status table is currently partitioned by range and starts off with a single range. +As new transactions are added and the partitions are filled up, new ranges are added to the table +automatically by the `TxnManagers`. The size of these partitions can be controlled via the +`--txn_manager_status_table_range_partition_span` master flag. + +== `TxnStatusManager` + +Requests to a given transaction status table partition are received and managed by the +`TxnStatusManager`, which keeps track of the transactions and participants stored in the tablet. +Tablet replicas of tables of the TXN_STATUS_TABLE type are expected to have a specific schema: + +[source,sql] +---- +CREATE TABLE kudu_system.kudu_transactions ( + txn_id INT64, + entry_type INT8, + identifier STRING, + metadata STRING, + PRIMARY KEY (txn_id, entry_type, identifier) +) PARTITION BY RANGE (txn_id) +( + PARTITION 0 <= VALUES < 1000000 +) +---- + +This schema allows the tablet replica to initialize a `TxnStatusManager` by reading its records. It +maintains an in-memory map of transaction IDs to transaction metadata, allowing it to serve the +current state or update the state. The primary role of the `TxnStatusManager` is to manage the +lifecycle of a transaction, transitioning it from state to state with the following allowed state +transitions. + + BeginCommit FinalizeCommit CompleteCommit +OPEN --> COMMIT_IN_PROGRESS --> FINALIZE_IN_PROGRESS --> COMMITTED + + BeginCommit BeginAbort FinalizeAbort +OPEN --> COMMIT_IN_PROGRESS --> ABORT_IN_PROGRESS --> ABORTED + + AbortTxn FinalizeAbort +OPEN --> ABORT_IN_PROGRESS --> ABORTED + +The creation of an `OPEN` transaction, and the transitions to either `COMMIT_IN_PROGRESS` or +`ABORT_IN_PROGRESS` are initiated by users and are synchronous. All other transitions are performed +automatically by the `TxnStatusManager` background tasks, and their completion can be monitored by +clients by getting the transaction’s status. + +To update a given transaction, the `TxnStatusManager` writes the changes to disk, but does not yet +commit the changes to in-memory state. Once the update is replicated and persisted to the +transaction status tablet's replicas, the transaction state transition is committed and made visible +to users. + +=== Leadership protection + +Much like the CatalogManager, only the leader `TxnStatusManager` is allowed to perform operations. +This is accomplished by taking a shared lock on in-memory Raft term that is set when a +`TxnStatusManager` becomes leader of a term. If a `TxnStatusManager` receives a request, it checks +to see if the current term is the same as the term set upon last becoming leader -- if the term does +not match, the request is rejected, signalling leadership has changed. + +In the event of a network partition where a leader has become stale but still thinks it is leader, +updates to the transactions status table are protected by the underlying tablet’s write path: the +attempt to write to the table will be blocked by the Raft protocol, and an error will be returned, +signaling leadership has changed. + +To ensure consistency of in-memory state across term changes, once a `TxnStatusManager` is elected +leader, it reads the contents of the tablet, regenerating the in-memory state of all transactions. + +=== Background orchestration tasks + +The transitioning from `COMMIT_IN_PROGRESS` or `ABORT_IN_PROGRESS` to their corresponding terminal +states, as well as orchestrating the appropriate participant RPCs, is managed by a set of tasks per +transaction. A similar pattern is used for each transition, so only the transition from +`COMMIT_IN_PROGRESS` to `COMMITTED` is described below. + +* Once the `TxnStatusManager` sets a transaction to `COMMIT_IN_PROGRESS`, it kicks off an + asynchronous RPC to each participant to begin committing. +* Upon returning, the last returned RPC writes the `FINALIZE_IN_PROGRESS` record to the + `TxnStatusManager`. +* Once written, the `TxnStatusManager` kicks off an asynchronous RPC to each participant to finalize + the commit. +* Upon returning, the last returned RPC writes the `COMMITTED` record to the `TxnStatusManager`. + +Since only the leader `TxnStatusManager` is meant to be updating transaction state, in the above +sequence, once returning from doing IO, or from waiting, the `TxnStatusManager` checks to ensure +it’s still the leader. If not, it stops the task. Additionally, once a `TxnStatusManager` becomes +leader, as it reads the existing transaction states from disk, it begins tasks for any transaction +that is in a non `OPEN`, non-terminal (i.e. not `COMMITTED`, not `ABORTED`) state. + +=== Heartbeating and staleness detection + +Clients send heartbeat messages to a `TxnStatusManager` in order to let it know that a transaction +is not abandoned. Otherwise, the leader `TxnStatusManager` automatically aborts abandoned +transactions. The leader `TxnStatusManager` keeps track of the last heartbeat for each transaction +in-memory only, making heartbeating a relatively lightweight operation. + +Each tablet server has a background thread that periodically goes through each `TxnStatusManager` +hosted on the server, and aborts all transactions that have not been heartbeat to within a +configured interval. Only transactions in the `OPEN` state are automatically aborted. + +== Transaction participants + +=== Transaction state machine + +Transaction participants keep track of local transaction state, ensuring that transactional writes +are only accepted if the transaction has not yet been committed or aborted. To do this, participants +persistently (i.e. via Raft replication) keep track of the transaction state, described below: + +* `kInitializing`: the transaction has yet to be fully initialized. Replication may be in progress, + but we we cannot consider the transaction as fully open yet. +* `kOpen`: the transaction is available for writes. +* `kCommitInProgress`: the `TxnStatusManager` has signaled to the participant that the transaction + should begin committing. The transaction can no longer accept new writes. +* `kCommitted`: the `TxnStatusManager` has finalized the commit. Transactional + rows should be visible to clients. +* `kAborted`: the `TxnStatusManager` has begun aborting the transaction. + +The following state transitions are expected: + + BEGIN_TXN BEGIN_COMMIT FINALIZE_COMMIT +kInitializing --> kOpen --> kCommitInProgress --> kCommitted + + BEGIN_TXN ABORT_TXN +kInitializing --> kOpen --> kAborted + +As orchestration RPCs are replicated on the participant, these transitions are what are applied to +the underlying tablet, updating an in-memory registry of `TxnMetadata` objects per ID, and +persisting the states with tablet metadata. + +// TODO: point to the change that suggests removing metadata once all in-memory stores are flushed. + +=== Registering participants + +Before a participant is able to participate in a transaction, it must register itself with the +`TxnStatusManager`, and replicate an op that demarcates the beginning of the transaction on all +replicas of the participant. + +To ensure this happens, when a transactional write is first received by a leader participant, it +first checks to see if it has already completed these preparatory steps. It does so by keeping an +in-memory map of “dispatchers”, one per active transaction. A participant’s dispatcher caches +whether or not the participant has completed the steps, and if so, simply lets the write through to +the prepare pool. + +Otherwise it keeps the write request in a queue temporarily, using the `TxnSystemClient` to +asynchronously register itself as a participant with the `TxnStatusManager`, and then replicate the +op to begin the transaction. Once the request is complete, the queued write requests are submitted +to the prepare threadpool. If either step returns an error, the error is passed back to the writer, +ensuring that retriable errors (e.g. leadership changes) lead to the transactional write op being +retried, and non-retriable errors (e.g. invalid transaction ID) are shown to users. + +Once the transaction has begun committing on the participant, or begun aborting, the transaction’s +dispatcher is unregistered. Further attempts to write to the transaction may instantiate a new +dispatcher, but the attempt to register the participant will fail, given the registration will be +rejected by the `TxnStatusManager`. + +=== Participant commit and MVCC + +The “commit” condition is different for transactions than regular write ops, which only need to be +considered “applied” to be visible to users. The goals for commit are: + +* Stop accepting writes once a `BEGIN_COMMIT` op has been replicated on a participant. +* Only show results that have been committed, as indicated by the replication of a `FINALIZE_COMMIT` + op on a participant, which contains a commit timestamp selected by the `TxnStatusManager` after + receiving all `BEGIN_COMMIT` op responses. + +To accomplish this, when finishing the replication of a `BEGIN_COMMIT` op, rather than demarcating +the associated MVCC op as completed, the Txn keeps track of the `BEGIN_COMMIT` MVCC op timestamp, +allowing a `FINALIZE_COMMIT` op to complete replicating, and then mark the `BEGIN_COMMIT` MVCC op as +applied. + +The commit timestamp is selected by the `TxnStatusManager` to be the highest of all `BEGIN_COMMIT` +op timestamps, so since the commit timestamp of a transaction will be greater than all +`BEGIN_COMMIT` timestamps (it is selected as the highest of all `BEGIN_COMMIT` timestamps) of all +participants, if serving a scan at time t, it is sufficient to wait until all ops before t are +applied. + +When reading rows, the commit condition changes depending on the kind of snapshot we are using: + +* *Timestamp (as in `READ_AT_SNAPSHOT`, `READ_YOUR_WRITES`, diff scan)*: The transaction is + considered committed if the `TxnMetadata` has both a `BEGIN_COMMIT` op that has been applied and a + commit timestamp that falls within the range of the snapshot. +* *Latest (as in `READ_LATEST`)*: The transaction is considered committed if the `TxnMetadata` has a + `BEGIN_COMMIT` op that has been applied, since we only apply the `BEGIN_COMMIT` op after + replicating the `FINALIZE_COMMIT` op. + +=== Transactional MemRowSets + +In addition to the single MRS that tablets traditionally have, tablets now maintain a map of +uncommitted transaction ID to dirty MRS, and a set of committed MRSs that were inserted as a part of +transactions. Each such MRS has a shared reference to a `TxnMetadata` instance that is maintained as +a part of `TabletMetadata`. + +When a transaction is begun on a participant, a transactional MRS is created for it. Transactional +write ops first check all DRSs for row presence, then the main, non-transactional MRS, then finally +attempt to insert to the transactional MRS. As `FINALIZE_COMMIT` ops are applied, uncommitted MRSs +are moved to the set of committed MRSs. + +When scanning through a transactional MRS, when evaluating whether a base insert is relevant to a +given scan, Kudu checks to see if the MRS’s `TxnMetadata` should be considered committed with the +given MVCC snapshot. Updates to the base inserts are evaluated as normal, wherein the visibility +condition is based on whether the updates have been applied (updates are not yet supported). + +Transactional MemRowSets are not flushed to disk until they are fully committed, at which point the +memory usage of all committed MRSs are lumped together when considering whether to flush. When +flushing, all committed MRSs are taken to be the flush input, similar to a merge compaction, and +DRSs are written as though we were compacting multiple MRSs. When flushed, on-disk timestamps are +written as normal, using the rows’ commit timestamps, and circumventing the need to consult +transaction metadata to evaluate upon further scans. + +=== Locking and deadlock protection + +Currently, per-transaction partition-level locking is supported, in tandem with per-op row-level +locking. Each tablet may participate in at most one transaction at a time. + +To avoid deadlocks, if a transaction were to attempt to acquire a partition lock that is held by a +transaction with a lower transaction ID, the later transaction aborts itself -- the tablet server +sends a best-effort request to the `TxnStatusManager` to abort the transaction, i.e. the caller +transaction “dies”. If the later transaction had a lower transaction ID, the op should be retried, +and the caller transaction “waits”. + +To ensure rows are properly locked in the presence of both transactional and non-transactional +workloads, all non-transactional write ops also take the partition lock with the maximum transaction +ID. This means that all transactional write ops will wait for non-transactional writes to finish, +and non-transactional writes will abort in the presence of a multi-row transaction inserting rows +into the same tablet. + +== Transactions client API + +Both the C++ and Java client leverage the existing session-based API that users have come to know. +With transactions, however, there is also the concept of transaction handles, which serve as the +initiators of transactions, the vehicle with which to create transactional sessions, the object +with which to orchestrate the commit or rollback of a transaction, and a means to signal activity on +a transaction via automatic heartbeating to prevent the `TxnStatusManager` from culling stale +transactions. + +[source,c++] +---- +shared_ptr<KuduTransaction> txn; +KUDU_RETURN_NOT_OK(client->NewTransaction(&txn)); +shared_ptr<KuduSession> session; +KUDU_RETURN_NOT_OK(txn->CreateSession(&session)); +// ... insert rows to 'session' +KUDU_RETURN_NOT_OK(session->Flush()); +KUDU_RETURN_NOT_OK(txn->Commit()); +---- + +[source,java] +---- +KuduTransaction txn = client.newTransaction(); +KuduSession session = txn.newKuduSession(); +// ... insert rows to 'session' +session.flush(); +txn.commit(); +---- + +=== Heartbeating + +Under the hood, each newly-created transaction handle, while kept in scope, also heartbeats through +to the `TxnStatusManagers` to signal activity for the transaction, preventing it from being culled +by the transaction staleness checkers. Client applications do not need to explicitly heartbeat. + +=== Serialization + +Given we expect there to be distributed actors that participate in a given transaction, Kudu also +exposes a way to transmit transaction handles across processes. Rather than exposing internal +details like the transaction ID, we serialize and deserialize a `TxnTokenPB` that contains metadata +about the transaction. + +[source,c++] +---- +string txn_token; +shared_ptr<KuduTransaction> txn; +KUDU_RETURN_NOT_OK(client_->NewTransaction(&txn)); +KUDU_RETURN_NOT_OK(txn->Serialize(&txn_token)); + +shared_ptr<KuduTransaction> same_txn; +KuduTransaction::Deserialize(client, txn_token, &same_txn) +---- + +[source,java] +---- +KuduTransaction txn = client.newTransaction(); +byte[] txn_token = txn.serialize(); +KuduTransaction sameTxn = KuduTransaction.deserialize(txn_token, asyncClient) +---- + +Since we typically expect there to be a single driver of a transaction and multiple actors as +participants of the transaction, by default, deserialized transaction handles do not heartbeat. +The expectation is that the drivers will continue heartbeating until the transaction is complete. +This can be toggled by passing customized `SerializationOptions`. +
