[
https://issues.apache.org/jira/browse/IGNITE-16907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ivan Bessonov reassigned IGNITE-16907:
--------------------------------------
Assignee: Ivan Bessonov
> Add ability to use Raft log as storage WAL within the scope of local recovery
> -----------------------------------------------------------------------------
>
> Key: IGNITE-16907
> URL: https://issues.apache.org/jira/browse/IGNITE-16907
> Project: Ignite
> Issue Type: Improvement
> Reporter: Alexander Lapin
> Assignee: Ivan Bessonov
> Priority: Major
> Labels: ignite-3
>
> h4. Problem
> From the birds eye view raft-to-storage flow looks similar to
> #
> {code:java}
> RaftGroupService#run(writeCommand());{code}
> # Inner raft replication logic, when replicated on majority adjust
> raft.commitedIndex.
> # Propagate command to RaftGroupListener (raft state machine).
> {code:java}
> RaftGroupListener#onWrite(closure(writeCommand()));{code}
> # Within state machine insert data from writeCommand to underneath storage:
> {code:java}
> var insertRes = storage.insert(cmd.getRow(), cmd.getTimestamp());{code}
> # ack that data was applied successfully
> {code:java}
> clo.result(insertRes);{code}
> # move raft.appliedIndex to corresponding value, meaning that the data for
> this index is applied to the state machine.
> The most interesting part, especially for given ticket, relates to step 4.
> In real world storage doesn't flush every mutator on disk, instead it buffers
> some amount of such mutators and flush them all-together as a part of some
> checkpointing process. Thus, if node fails before mutatorsBuffer.flush() it
> might lost some data because raft will apply data starting from appliedIndex
> + 1 on recovery.
> h4. Possible solutions:
> There are several possibilities to solve this issue:
> # In-storage WAL. Bad solution, because there's already raft log that can be
> used as a WAL. Such duplication is redundant.
> # Local recovery starting from appliedIndex - mutatorsBuffer.size. Bad
> solution. Won't work for not-idempotent operations. Exposes inner storage
> details such as mutatorBuffer.size.
> # proposedIndex propagation + checkpointIndex synchonization. Seems fine.
> More details below:
> * First off all, in order to coordinate raft replicator and storage,
> proposedIndex should be propagated to raftGroupListener and storage.
> * On every checkpoint, storage will persist corresponding proposed index as
> checkpointIndex.
> ** In case of storage inner checkpoints, storage won't notify raft
> replicator about new checkpointIndex. This kind of notification is an
> optimization that does not affect the correctness of the protocol.
> ** In case of outer checkpoint intention, e.g. raft snapshotting for the
> purposes of raft log truncation, corresponding checkpointIndex will be
> propagated to raft replicator within a callback "onShapshotDone".
> * During local recovery raft will apply raft log entries from the very
> begging. If checkpointIndex occurred to be bigger than proposedIndex on an
> another raft log entity it fails the proposed closure with
> IndexMismatchException(checkpointIndex) that leads to proposedIndex shift and
> optional async raft log truncation.
> Let's consider following example:
> ] checkpointBuffer = 3. [P] - perisisted entities, [!P] - not perisisted/in
> memory one.
> # raft.put(k1,v1)
> ## -> raftlog[cmd(k1,v1, index:1)]
> ## -> storage[(k1,v1), index:1]
> ## -> appliedIndex:1
> # raft.put(k2,v2)
> ## -> raftlog[cmd(k1,v1, index:1), \\{*}cmd(k2,v2, index:2)\\{*}]
> ## -> storage[(k1,v1), \\{*}(k2,v2)\\{*}, ** index:\\{*}2\\{*}]
> ## -> appliedIndex:{*}2{*}
> # raft.put(k3,v3)
> ## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), \\{*}cmd(k3,v3,
> index:3)\\{*}]
> ## -> storage[(k1,v1), (k2,v2), \\{*}(k3,v3)\\{*}, index:\\{*}3\\{*}]
> ## -> appliedIndex:{*}3{*}
> ## *inner storage checkpoint*
> ### raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3, index:3)]
> ### storage[(k1,v1, proposedIndex:1), (k2,v2, proposedIndex:2), (k3,v3,
> proposedIndex:3)]
> ### {*}checkpointedData[(k1,v1),* *(k2,v2),* \\{*}(k3,v3),
> checkpointIndex:3\\{*}{*}\\{*}{*}]{*}{*}{{*}}
> # raft.put(k4,v4)
> ## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3,
> index:3), \\{*}cmd(k4,v4, index:4)\\{*}]
> ## -> storage[(k1,v1), (k2,v2), (k3,v3), *(k4,v4)* index:\\{*}4\\{*}]
> ## -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
> ## -> appliedIndex:{*}4{*}
> # Node failure
> # Node restart
> ## StorageRecovery: storage.apply(checkpointedData)
> ## raft-to-storage data application starting from index: 1 // raft doesn't
> know checkpointedIndex at this point.
> ### -> storageResponse::IndexMismatchException(3)
> #### raft-to-storage data application starting from index: 3 + 1
> # Recovey result:
> ## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3,
> index:3), cmd(k4,v4, index:4)]
> ## -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
> ## -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
> ## -> appliedIndex:4
> # Raft log truncation
> ## storage.forceCheckpoint
> ### -> raftlog[index:4]
> ### -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
> ### -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), (k4,v4) checkpointIndex:4]
> ### -> appliedIndex:4
--
This message was sent by Atlassian Jira
(v8.20.10#820010)