[
https://issues.apache.org/jira/browse/IGNITE-16907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alexander Lapin updated IGNITE-16907:
-------------------------------------
Description:
h4. Problem
>From the birds eye view raft-to-storage flow looks similar to
#
{code:java}
RaftGroupService#run(writeCommand());{code}
# Inner raft replication logic, when replicated on majority adjust
raft.commitedIndex.
# Propagate command to RaftGroupListener (raft state machine).
{code:java}
RaftGroupListener#onWrite(closure(writeCommand()));{code}
# Within state machine insert data from writeCommand to underneath storage:
{code:java}
var insertRes = storage.insert(cmd.getRow(), cmd.getTimestamp());{code}
# ack that data was applied successfully
{code:java}
clo.result(insertRes);{code}
# move raft.appliedIndex to corresponding value, meaning that the data for
this index is applied to the state machine.
The most interesting part, especially for given ticket, relates to step 4.
In real world storage doesn't flush every mutator on disk, instead it buffers
some amount of such mutators and flush them all-together as a part of some
checkpointing process or similar. Thus, if node fails before
mutatorsBuffer.flush() it will lost some data if, during local recovery
process, raft will apply data, starting from appliedIndex + 1.
h4. Possible solutions:
There are several possibilities to solve this issue:
# In-storage WAL. Bad solution, because there's already raft log that can be
used as a WAL. Such duplication is redundant.
# local recovery starting from appliedIndex - mutatorsBuffer.size. Bad
solution. Won't work for not-idempotent operations. Exposes inner storage
details such as mutatorBuffer.size.
# proposedIndex propagation + checkpointIndex synchonization. Seems fine. More
details below:
* First off all, in order to coordinate raft replicator and storage,
proposedIndex will be propagated to raftGroupListener and storage.
* On every checkpoint, storage will persist corresponding proposed index as
checkpointIndex.
** In case of storage inner checkpoints, storage won't notify raft replicator
about new checkpointIndex. This kind of notification is an optimization that
does not affect the correctness of the protocol.
** In case of outer checkpoint intention, e.g. raft snapshotting for the
purposes of raft log truncation, corresponding checkpointIndex will be
propagated to raft replicatior within a callback "onShapshotDone".
* During local recovery raft starts propagating raft log entries from it's
very begging. If checkpointIndex occured to be bigger than proposedIndex or an
another raft log entiry it fails the proposed closure with
IndexMismatchException(checkpointIndex) that leads to proposedIndex shift and
optinal async raft log truncation.
Let's consider following example:
] checkpointBuffer = 3. [P] - perisisted entities, [!P] - not perisisted/in
memory one.
# raft.put(k1,v1)
## -> raftlog[cmd(k1,v1, index:1)]
## -> storage[(k1,v1), index:1]
## -> appliedIndex:1
# raft.put(k2,v2)
## -> raftlog[cmd(k1,v1, index:1), \{*}cmd(k2,v2, index:2)\{*}]
## -> storage[(k1,v1), \{*}(k2,v2)\{*}, ** index:\{*}2\{*}]
## -> appliedIndex:{*}2{*}
# raft.put(k3,v3)
## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), \{*}cmd(k3,v3,
index:3)\{*}]
## -> storage[(k1,v1), (k2,v2), \{*}(k3,v3)\{*}, index:\{*}3\{*}]
## -> appliedIndex:{*}3{*}
## *inner storage checkpoint*
### raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3, index:3)]
### storage[(k1,v1, proposedIndex:1), (k2,v2, proposedIndex:2), (k3,v3,
proposedIndex:3)]
### {*}checkpointedData[(k1,v1),* *(k2,v2),* \{*}(k3,v3),
checkpointIndex:3\{*}{*}\{*}{*}]{*}{*}{{*}}
# raft.put(k4,v4)
## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3, index:3),
\{*}cmd(k4,v4, index:4)\{*}]
## -> storage[(k1,v1), (k2,v2), (k3,v3), *(k4,v4)* index:\{*}4\{*}]
## -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
## -> appliedIndex:{*}4{*}
# Node failure
# Node restart
## StorageRecovery: storage.apply(checkpointedData)
## raft-to-storage data application starting from index: 1 // raft doesn't
know checkpointedIndex at this point.
### -> storageResponse::IndexMismatchException(3)
#### raft-to-storage data application starting from index: 3 + 1
# Recovey result:
## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3, index:3),
cmd(k4,v4, index:4)]
## -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
## -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
## -> appliedIndex:4
# Raft log truncation
## storage.forceCheckpoint
### -> raftlog[index:4]
### -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
### -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), (k4,v4) checkpointIndex:4]
### -> appliedIndex:4
was:
h4. Problem
>From the birds eye view raft-to-storage flow looks similar to
#
{code:java}
RaftGroupService#run(writeCommand());{code}
# Inner raft replication logic, when replicated on majority adjust
raft.commitedIndex.
# Propagate command to RaftGroupListener (raft state machine).
RaftGroupListener:
{code:java}
RaftGroupListener#onWrite(closure(writeCommand()));{code}
# Within state machine insert data from writeCommand to underneath storage:
{code:java}
var insertRes = storage.insert(cmd.getRow(), cmd.getTimestamp());{code}
# ack that data was applied successfully
{code:java}
clo.result(insertRes);{code}
# move raft.appliedIndex to corresponding value, meaning that the data for
this index is applied to the state machine.
The most interesting part, especially for given ticket, relates to step 4.
In real world storage doesn't flush every mutator on disk, instead it buffers
some amount of such mutators and flush them all-together as a part of some
checkpointing process or similar. Thus, if node fails before
mutatorsBuffer.flush() it will lost some data if, during local recovery
process, raft will apply data, starting from appliedIndex + 1.
h4. Possible solutions:
There are several possibilities to solve this issue:
# In-storage WAL. Bad solution, because there's already raft log that can be
used as a WAL. Such duplication is redundant.
# local recovery starting from appliedIndex - mutatorsBuffer.size. Bad
solution. Won't work for not-idempotent operations. Exposes inner storage
details such as mutatorBuffer.size.
# proposedIndex propagation + checkpointIndex synchonization. Seems fine. More
details below:
* First off all, in order to coordinate raft replicator and storage,
proposedIndex will be propagated to raftGroupListener and storage.
* On every checkpoint, storage will persist corresponding proposed index as
checkpointIndex.
** In case of storage inner checkpoints, storage won't notify raft replicator
about new checkpointIndex. This kind of notification is an optimization that
does not affect the correctness of the protocol.
** In case of outer checkpoint intention, e.g. raft snapshotting for the
purposes of raft log truncation, corresponding checkpointIndex will be
propagated to raft replicatior within a callback "onShapshotDone".
* During local recovery raft starts propagating raft log entries from it's
very begging. If checkpointIndex occured to be bigger than proposedIndex or an
another raft log entiry it fails the proposed closure with
IndexMismatchException(checkpointIndex) that leads to proposedIndex shift and
optinal async raft log truncation.
Let's consider following example:
] checkpointBuffer = 3. [P] - perisisted entities, [!P] - not perisisted/in
memory one.
# raft.put(k1,v1)
## -> raftlog[cmd(k1,v1, index:1)]
## -> storage[(k1,v1), index:1]
## -> appliedIndex:1
# raft.put(k2,v2)
## -> raftlog[cmd(k1,v1, index:1), {*}cmd(k2,v2, index:2){*}]
## -> storage[(k1,v1), {*}(k2,v2){*}, ** index:{*}2{*}]
## -> appliedIndex:{*}2{*}
# raft.put(k3,v3)
## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), {*}cmd(k3,v3,
index:3){*}]
## -> storage[(k1,v1), (k2,v2), {*}(k3,v3){*}, index:{*}3{*}]
## -> appliedIndex:{*}3{*}
## *inner storage checkpoint*
### raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3, index:3)]
### storage[(k1,v1, proposedIndex:1), (k2,v2, proposedIndex:2), (k3,v3,
proposedIndex:3)]
### *checkpointedData[(k1,v1),* *(k2,v2),* {*}(k3,v3),
checkpointIndex:3{*}{*}{*}{*}]{*}{*}{*}
# raft.put(k4,v4)
## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3, index:3),
{*}cmd(k4,v4, index:4){*}]
## -> storage[(k1,v1), (k2,v2), (k3,v3), *(k4,v4)* index:{*}4{*}]
## -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
## -> appliedIndex:{*}4{*}
# Node failure
# Node restart
## StorageRecovery: storage.apply(checkpointedData)
## raft-to-storage data application starting from index: 1 // raft doesn't
know checkpointedIndex at this point.
### -> storageResponse::IndexMismatchException(3)
#### raft-to-storage data application starting from index: 3 + 1
# Recovey result:
## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3, index:3),
cmd(k4,v4, index:4)]
## -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
## -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
## -> appliedIndex:4
# Raft log truncation
## storage.forceCheckpoint
### -> raftlog[index:4]
### -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
### -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), (k4,v4) checkpointIndex:4]
### -> appliedIndex:4
> Add ability to use Raft log as storage WAL wihtin the scope of local recovery
> -----------------------------------------------------------------------------
>
> Key: IGNITE-16907
> URL: https://issues.apache.org/jira/browse/IGNITE-16907
> Project: Ignite
> Issue Type: Improvement
> Reporter: Alexander Lapin
> Priority: Major
> Labels: ignite-3
>
> h4. Problem
> From the birds eye view raft-to-storage flow looks similar to
> #
> {code:java}
> RaftGroupService#run(writeCommand());{code}
> # Inner raft replication logic, when replicated on majority adjust
> raft.commitedIndex.
> # Propagate command to RaftGroupListener (raft state machine).
> {code:java}
> RaftGroupListener#onWrite(closure(writeCommand()));{code}
> # Within state machine insert data from writeCommand to underneath storage:
> {code:java}
> var insertRes = storage.insert(cmd.getRow(), cmd.getTimestamp());{code}
> # ack that data was applied successfully
> {code:java}
> clo.result(insertRes);{code}
> # move raft.appliedIndex to corresponding value, meaning that the data for
> this index is applied to the state machine.
> The most interesting part, especially for given ticket, relates to step 4.
> In real world storage doesn't flush every mutator on disk, instead it buffers
> some amount of such mutators and flush them all-together as a part of some
> checkpointing process or similar. Thus, if node fails before
> mutatorsBuffer.flush() it will lost some data if, during local recovery
> process, raft will apply data, starting from appliedIndex + 1.
> h4. Possible solutions:
> There are several possibilities to solve this issue:
> # In-storage WAL. Bad solution, because there's already raft log that can be
> used as a WAL. Such duplication is redundant.
> # local recovery starting from appliedIndex - mutatorsBuffer.size. Bad
> solution. Won't work for not-idempotent operations. Exposes inner storage
> details such as mutatorBuffer.size.
> # proposedIndex propagation + checkpointIndex synchonization. Seems fine.
> More details below:
> * First off all, in order to coordinate raft replicator and storage,
> proposedIndex will be propagated to raftGroupListener and storage.
> * On every checkpoint, storage will persist corresponding proposed index as
> checkpointIndex.
> ** In case of storage inner checkpoints, storage won't notify raft
> replicator about new checkpointIndex. This kind of notification is an
> optimization that does not affect the correctness of the protocol.
> ** In case of outer checkpoint intention, e.g. raft snapshotting for the
> purposes of raft log truncation, corresponding checkpointIndex will be
> propagated to raft replicatior within a callback "onShapshotDone".
> * During local recovery raft starts propagating raft log entries from it's
> very begging. If checkpointIndex occured to be bigger than proposedIndex or
> an another raft log entiry it fails the proposed closure with
> IndexMismatchException(checkpointIndex) that leads to proposedIndex shift and
> optinal async raft log truncation.
> Let's consider following example:
> ] checkpointBuffer = 3. [P] - perisisted entities, [!P] - not perisisted/in
> memory one.
> # raft.put(k1,v1)
> ## -> raftlog[cmd(k1,v1, index:1)]
> ## -> storage[(k1,v1), index:1]
> ## -> appliedIndex:1
> # raft.put(k2,v2)
> ## -> raftlog[cmd(k1,v1, index:1), \{*}cmd(k2,v2, index:2)\{*}]
> ## -> storage[(k1,v1), \{*}(k2,v2)\{*}, ** index:\{*}2\{*}]
> ## -> appliedIndex:{*}2{*}
> # raft.put(k3,v3)
> ## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), \{*}cmd(k3,v3,
> index:3)\{*}]
> ## -> storage[(k1,v1), (k2,v2), \{*}(k3,v3)\{*}, index:\{*}3\{*}]
> ## -> appliedIndex:{*}3{*}
> ## *inner storage checkpoint*
> ### raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3, index:3)]
> ### storage[(k1,v1, proposedIndex:1), (k2,v2, proposedIndex:2), (k3,v3,
> proposedIndex:3)]
> ### {*}checkpointedData[(k1,v1),* *(k2,v2),* \{*}(k3,v3),
> checkpointIndex:3\{*}{*}\{*}{*}]{*}{*}{{*}}
> # raft.put(k4,v4)
> ## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3,
> index:3), \{*}cmd(k4,v4, index:4)\{*}]
> ## -> storage[(k1,v1), (k2,v2), (k3,v3), *(k4,v4)* index:\{*}4\{*}]
> ## -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
> ## -> appliedIndex:{*}4{*}
> # Node failure
> # Node restart
> ## StorageRecovery: storage.apply(checkpointedData)
> ## raft-to-storage data application starting from index: 1 // raft doesn't
> know checkpointedIndex at this point.
> ### -> storageResponse::IndexMismatchException(3)
> #### raft-to-storage data application starting from index: 3 + 1
> # Recovey result:
> ## -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3,
> index:3), cmd(k4,v4, index:4)]
> ## -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
> ## -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
> ## -> appliedIndex:4
> # Raft log truncation
> ## storage.forceCheckpoint
> ### -> raftlog[index:4]
> ### -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
> ### -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), (k4,v4) checkpointIndex:4]
> ### -> appliedIndex:4
--
This message was sent by Atlassian Jira
(v8.20.7#820007)