This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 63ec070465 Add initial Accord Overview Doc
63ec070465 is described below

commit 63ec070465856ed6808a72d95b4e20c248fd2c91
Author: Alex Petrov <[email protected]>
AuthorDate: Tue Mar 4 09:28:22 2025 +0100

    Add initial Accord Overview Doc
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for 
CASSANDRA-20393.
---
 .../cassandra/pages/developing/accord/index.adoc   | 360 +++++++++++++++++++++
 doc/modules/cassandra/pages/developing/index.adoc  |   1 +
 2 files changed, 361 insertions(+)

diff --git a/doc/modules/cassandra/pages/developing/accord/index.adoc 
b/doc/modules/cassandra/pages/developing/accord/index.adoc
new file mode 100644
index 0000000000..8320b49a0a
--- /dev/null
+++ b/doc/modules/cassandra/pages/developing/accord/index.adoc
@@ -0,0 +1,360 @@
+== Accord Intro
+
+This document is intended to facilitate quick dive into Accord and
+Cassandra Integration code for anyone interested in the project. Readers
+should be closely familiar at very least with Single-Decree Paxos and
+fluent in Consensus terminology. Familiarity with Accord protocol
+itself, or similar protocols such as EPaxos, TAPIR, Janus, or Tempo, can
+be useful.
+
+Accord code is logically split into local and coordinator part.
+Coordination code contains code intended for coordination/invocation of
+the client query, driving it through the Accord state machine, and all
+commands and utilities for tracking/retrying their state. Node-local
+code contains utility for keeping record of replica state and facilitate
+local execution (i.e. responding to coordinator queries).
+
+There are _many_ enums in Accord. They’re extremely useful for
+understanding the state machine of each of the components.
+
+Cassandra Integration implements interfaces provided by Accord, and
+plugs in messaging, serialization, CQL, concurrency/execution, on-disk
+state management, and stable storage (i.e. Cassandra tables).
+
+When the request comes from the client, broadly speaking, it gets parsed
+and turns into `TransactionStatement`. `TransactionStatement` contains
+updates, selects, assignments, and conditions intended for
+atomic/transactional execution. These statements are translated into
+Accord commands (i.e. `Read`, `Write`, or `Update`), and form Accord
+Transaction (`Txn`). Transaction is executed yielding `TxnResult` that
+can be returned to the client.
+
+== Coordinator Side
+
+=== Accord Protocol Basics
+
+Coordinator allocates a globally unique transaction ID `TxnId` for the
+transaction, and begins coordination (see `CoordinateTransaction`).
+Here, coordinator perform initial rounds of `PreAccept` and `Accept`
+until the agreement about when transaction should execute is reached.
+Coordinated query execution starts with a `PreAccept` message, which
+contains transaction definition and routing information.
+
+On the replica locally, each Accord message first lands in
+`AccordVerbHandler`, which handles _all_ Accord messages. Replica
+determines whether it is aware of the _epoch_ specified by the
+transaction coordinator. Messages for the future epochs are parked until
+epoch becomes active on the node; messages for known epochs are
+submitted to their corresponding command stores (think: local shards).
+Replica applies the message locally, changing its local state, and
+producing coordinator response. Coordinator collects replica responses
+and continues driving transaction through the execution state machine.
+
+Every transaction has a home key - a global value that defines the home
+shard, the one tasked with ensuring the transaction is finished. Home
+key is chosen arbitrarily: it is either a first key the coordinator
+owns, or it is picked completely at random.
+
+== Replica Side
+
+=== CommandStore
+
+`Command` is a unit of Accord _metadata_ that relates to a specific
+operation, as opposed to `Message`, which is an _instruction_ sent by
+coordinator to the replica for execution that _changes_ this command
+state. `Command` does _not_ hold the state of an entire transaction, but
+rather a _part_ of transaction executed on a particular shard.
+_Coordinator_ is responsible for executing the entirety of the
+transaction, `Command`s are just local execution states.
+
+Commands are held by a Command _Store_, a single threaded internal shard
+of accord transaction metadata. It holds state required for command
+execution, and executes commands sequentially. For command execution,
+`CommandStore` creates a `SafeCommandStore`, a version of `CommandStore`
+created for command execution, during which it has exclusive access to
+it.
+
+Roughly speaking, you can think of relation between CommandStore and
+SafeCommandStore as:
+
+....
+SafeCommandStore safeStore = commandStore.beginOperation(context)
+try {
+   message.apply(safeStore);
+}
+finally {
+   commandStore.completeOperation(safeStore);
+}
+....
+
+In other words, `CommandStore` collects the `PreLoadContext`, state
+required to be in memory for command execution (possible dependencies,
+such as `TxnId`s, and `Key`s of commands, but also `CommandsForKeys`
+that will be needed during execution). Once the context is collected and
+command’s turn to execute on command store comes, _safe_ command store
+is created and passed to the command.
+
+Any executing operation may require changes to command store state. For
+this, `SafeCommandStore` creates a special version of command state,
+`SafeCommand` and `SafeCommandsForKey` that can be updated during
+execution. Naturally, either _all_ of the states changed during
+operation execution will become visible, or none of them will. In order
+to ensure transactional integrity, changes to commands are tracked and
+are recorded into `Journal` for crash-recovery. `ProgressLog` and
+`CommandsForKey` are up
+
+On Cassandra side, concurrent execution is controlled by `AccordTask`,
+which contains cache loading logic and persistence callbacks. Since
+Accord may potentially hold a large number of command states in memory,
+their states may be _shrunk_ to their binary representation to save some
+memory, or they can get fully evicted. This also means that `AccordTask`
+will have to reload relevant dependencies from preload context before
+command execution can begin.
+
+=== AsyncChain, AccordTask, AccordExecutor
+
+Accord is designed for high concurrency, and most things are constructed
+as asynchronous chains. `AsyncChain` API is very similar to the one of
+Java futures, but has several convenient methods that make execution on
+multiple executors (think: command stores, loaders) simpler.
+
+Each `CommandStore` has its own `AccordExecutor`. For the purpose of
+this document you may consider it as a single-threaded executor.
+`AccordExecutor` keeps track of tasks in different states, primarily:
+
+* `WAITING_TO_LOAD` - executor has a maximum number of concurrent load
+tasks. If the number of in-progress loads exceeds this number, all
+subsequently added loads will go into the waiting to load queue.
+* `LOADING` - tasks for which dependencies are being loaded.
+`CommandsForKeys` are paged in from the auxiliary table, while `Command`
+states are loaded directly from the `Journal`.
+* `WAITING_TO_RUN` / `RUNNING` / `FINISHED` - these three are
+self-explanatory; once dependencies are loaded, task is ready to run;
+when its turn comes, it transitions to running state, and once its done,
+it’s finished.
+
+There are several other states, which you can find in
+`AccordTask$State`. It might be worth to mention that Accord tasks are
+_cancellable_. Tasks that were timed out before execution, have been
+preempted, or should not run due to other reasons, can and will be
+cancelled. Tasks transition between different AccordExecutor queues
+depending on their execution states.
+
+In Accord, all tasks have to be executed in strict order, and a task
+can’t execute before its dependencies have executed, else there’s no
+guarantee of strict order. Tasks are notified about dependency readiness
+using `NotificationSink`, which updates the tasks’s `WaitingOn`
+collection. `WaitingOn` is responsible for registering listeners with
+`CommandStore` if dependencies need to be executed before the current
+task can.
+
+`WaitingOn`, `NotificationSink` and `LocalListeners` registered with
+CommandStore can be thought of as a ``happy path'' execution: when
+coordinator makes timely progress changing command states. If
+coordinator _fails_ to make progress, `ProgressLog` kicks in after the
+registered deadline.
+
+=== ProgressLog
+
+The progress log is responsible for ensuring progress in transactions
+that aren’t making any. It does two things:
+
+* Fetches data from peers via `WaitingState`. Depending on the state of
+transaction, it may trigger fetch of a subset of required dependencies
+from peers via `FetchData`. For example, we haven’t received Apply, but
+we’re ReadyToExecute.
+* Triggers recovery via `HomeState`. The progress log may also
+autonomously decide that a transaction which hasn’t been
+decided/executed (and otherwise should be able to do so) should have the
+recovery protocol invoked. In other words, if _coordination_ of the
+transaction is stuck (i.e. further progress is not happening not due to
+lack of dependencies required locally, but because of the transaction
+coordinator), may trigger recovery via `MaybeRecover`.
+
+=== Command
+
+Command is a core block of the Accord local state. `Message`s, such as
+`PreAccept`, `Propose`, `Accept`, and many others, change `Command`
+state for a given store during execution.
+
+* `SaveStatus` - node-local command status
+* `Participants` - core routing information required for transaction.
+Keys or Ranges participating in the transaction.
+* Timestamps:
+** `ExecuteAt` - a timestamp at which this transaction is decided to be
+executed. May differ from its `TxnId` if a higher ballot was witnessed
+during `PreAccept` phase, in case there any conflicts are discovered.
+** `ExecutesAtLeast` - only relevant for `WaitingOnWithExecutesAtLeast`
+** Ballots for coordinating within a specific `TxnId`:
+*** `Promised` - a non-zero ballot can be set as a result of recovery; a
+recovery coordinator (see Recovery Protocol in Accord paper for details)
+is picking its own globally unique ballot for re-proposal.
+*** `AcceptedOrCommitted` - same as `Promised` (i.e. a non-zero ballot
+is set as a result of recovery), except for later protocol stages.
+* `PartialTxn` - shard-relevant definition of the transaction.
+* Dependencies:
+** `PartialDeps` - a collection of transaction dependencies, keyed by
+the key or range on which they were adopted.
+** `WaitingOn` - a subset of the above dependencies this command needs
+to wait on.
+** A collection of transaction dependencies, keyed by the key or range
+on which they were adopted.
+* `Writes` - a collection of data to write to one or more stores
+* `Result` - a result to be returned to a client, or be stored in a
+node’s command state. Effectively unused in Cassandra implementation.
+
+=== CommandsForKey (CFK)
+
+`CommandsForKey` is a specialised collection for efficiently
+representing and querying everything Accord needs for making
+coordination and recovery decisions about a key’s command conflicts, and
+for managing execution order.
+
+CommandsForKey is updated via `SafeCommandsForKey` after command
+execution in `SafeCommandStore#updateCommandsForKey`. CommandsForKey
+defferentiates between managed and unmanaged transactions:
+
+* Managed transactions are transactions witnessed by `CommandsForKey`
+for dependency management (essentially all globally visible key
+transactions): simple key transactions, like reads and writes.
+* Unmanaged transactions are those that depend on the simple key
+transactions but are not themselves such, e.g. sync points, range
+transactions, etc. These transactions need only adopt a dependency on
+the Key to represent _all of these transactions_. CFK will then notify
+when they have executed.
+
+=== CommandStore’s auxiliary collections
+
+==== RedundantBefore
+
+RedundantBefore is (incrementally) persisted in Journal and used by
+CommandStore to track transactions that have been fully applied, or
+invalidated across all shards. Once the transaction is redundant
+(i.e. it has been either _applied_ or _invalidated_ durably on the
+majority of participants), its metadata can be removed and only
+transactional bounds can be maintained for dependency tracking purposes.
+`RedundantBefore` plays an important role during journal compaction (by
+providing information about which transactions can be purged).
+
+=== DurabilityService and (Exclusive)SyncPoint
+
+For intent of this document, we will only be covering _Exclusive_
+SyncPoints, even though other kinds might still exist as of time of
+writing this. `SyncPoints` serve as a logical barrier in transaction
+history, and are used for invalidating older `TxnId`s, so that a newly
+bootstrapped node may have a complete log as of a point in time `TxnId`,
+and replicas could purge/GC earlier transaction metadata.
+
+SyncPoints are not expected to be processed by the the whole cluster,
+and we do not want transaction processing to be held up, so while these
+are processed much like a transaction, they are invisible to real
+transactions which may proceed before SyncPoint is witnessed by the node
+processing it.
+
+ExclusiveSyncPoint is created by `DurabilityScheduler`, as the first
+step for coordinating shard durability, which is scheduled for periodic
+execution. During this step, we perform initial rounds of `PreAccept`
+and `Accept` until we have reached agreement about when `SyncPoint`
+should execute.
+
+After shard is marked durable, `RedundantBefore` collection is updated,
+which serves an important role in bootstrap, log replay, log compaction,
+and replica-side command purging/invalidation.
+
+=== ConfigurationService and TopologyManager
+
+Time in Accord is sliced into epochs. Each epoch constitutes a unique
+cluster configuration (`Topology`). Topology represents mapping between
+key ranges and nodes, here every range has to be replicated to a certain
+number of nodes. Coordinator assigns epoch to each transaction; replicas
+may decline transactions that arrive to epochs that were previously
+closed.
+
+`TopologyManager` is responsible for listening to notifications about
+cluster configuration changes, and creation of epochs. Once epoch is
+created, it needs to be bootstrapped before it is ready. Epoch readiness
+consists of 4 _independent_ states:
+
+* Metadata: The new epoch has been setup locally and the node is ready
+to process commands for it.
+* Coordinate: The node has retrieved enough remote information to answer
+coordination decisions for the epoch (including fast path decisions).
+Once a quorum of the new epoch has achieved this, earlier epochs do not
+need to be contacted by coordinators of transactions started in the new
+epoch (or later).
+* Data: The node has successfully replicated the underlying `DataStore`
+information for the new epoch, but may need to perform some additional
+coordination before it can execute the read portion of a transaction.
+* Reads: The node has retrieved enough remote information to safely
+process reads, including replicating all necessary DataStore
+information, and any additional transactions necessary for consistency.
+
+=== Data Store
+
+One of the most important integration points, DataStore, is responsible
+for application of transactional information into database’s stable
+storage.
+
+=== Accord Journal
+
+==== Garbage Collection / Cleanup
+
+* `ERASE`: we can erase data once we are certain no other replicas
+require our information. Erased should ONLY be adopted on a replica that
+knows EVERY shard has successfully applied the transaction at all
+healthy replicas (or else that it is durably invalidated).
+* `EXPUNGE`: we can expunge data once we can reliably and safely expunge
+any partial record. To achieve the latter, we use only global summary
+information and the TxnId and if present any applyAt.
+* `INVALIDATE`: command has been was decidedly (and durably) superseded
+by a different command (e.g., a higher higher ballot was witnessed
+during recovery), and will *never* be executed.
+* `VESTIGIAL`: command cannot be completed and is either pre-bootstrap,
+did not commit, or did not participate in this shard’s epoch.
+* `TRUNCATE`: means the subset of command metadata (i.e., deps, outcome,
+or appliedAt) can be partially discarded.
+
+== Contributing Changes to Accord
+
+Accord is covered by a large number of tests, but probably most
+prominent among them is a `BurnTest`. BurnTest is a deterministic
+simulation of the protocol with strict serializability checker. BurnTest
+simulates time, message passing, concurrency, faults, and many other
+things. If you are intending to make a chance to Accord, it is
+recommended you run `BurnTest` at very least several dozen times in the
+loop to ensure correctness of your change. BurnTest can also be useful
+for reasoning about and exploring protocol states. Put a breakpoint at a
+spot you consider important, run the burn test and see what’s going on.
+
+Accord also comes with many built-in assertions. Protocol has many
+checks for internal consistency that can be helpful during development.
+Most of the time, rather than triggering a strict serializability
+checker error, you will see some form of internal assertion detecting an
+inconsistency. These invariants are there for a reason, and in an
+overwhelming majority of cases disabling or ignoring them is not a good
+idea.
+
+== Cheat Sheet
+
+* Medium Path - is a coordinator optimization. This is the case where t0
+can be agreed (i.e. executeAt=txnId), and where we would like not to
+take 3 round-trips, as this situation is likely to occur when we lose
+the fast path quorum. The medium path permits only 2 round-trips because
+it can be used as a complete set of dependencies (due to their having
+been calculated against the correct bound, t0, and that bound having
+been applied at a quorum so that conflicting transactions will propose a
+higher executeAt).
+* `SaveStatus` vs `Status` - `SaveStatus` is a replica-local status that
+contains additional information helpful for tracking state machine state,
+and heavily used for validating internal consistency in Accord, while
+`Status` is a part of a distributed state machine that tracks distributed
+transaction state.
+* `Routable` - something that can be found in the cluster, and MAYBE
+found on disk (if Seekable.
+** `Unseekable` - _routing_ key; in Cassandra terms, you can think of a
+`Token`
+** `Seekable` - Something that can be found within the cluster AND found
+on disk, queried and returned; i.e., key or key range.
+* Route vs RoutingKey vs FullRoute vs PartialRoute -
+** `Partial` vs `Full` route are understood in the context of a single
+transaction.
diff --git a/doc/modules/cassandra/pages/developing/index.adoc 
b/doc/modules/cassandra/pages/developing/index.adoc
index 8c9f735e2c..409a423a6f 100644
--- a/doc/modules/cassandra/pages/developing/index.adoc
+++ b/doc/modules/cassandra/pages/developing/index.adoc
@@ -2,3 +2,4 @@
 
 * xref:cassandra:developing/data-modeling/index.adoc[Data Modeling]
 * xref:cassandra:developing/cql/index.adoc[CQL]
+* xref:cassandra:developing/accord/index.adoc[Accord]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to