
Semyon Danilov updated IGNITE-16668:
If a node storing a partition of an in-memory table fails and leaves the 
cluster all data it had is lost. From the point of view of the partition it 
looks like as the node is left forever.

Although Raft protocol tolerates leaving some amount of nodes composing Raft 
group (partition); for in-memory caches we cannot restore replica factor 
because of in-memory nature of the table.

It means that we need to detect failures of each node owning a partition and 
recalculate assignments for the table without keeping replica factor.
h4. Upd 1:
h4. Problem

By design raft has several persisted segments, e.g. raft meta 
(term/committedIndex) and stable raft log. So, by converting common raft to 
in-memory one it’s possible to break some of it’s invariants. For example Node 
C could vote for Candidate A before self-restart and vote then for Candidate B 
after one. As a result two leaders will be elected which is illegal.
!Screenshot from 2022-04-19 11-11-05.png!
h4. Solution

In order to solve the problem mentioned above it’s possible to remove and then 
return back the restarting node from the peers of the corresponding raft group. 
The peer-removal process should be finished before the restarting of the 
corresponding raft server node.
  !Screenshot from 2022-04-19 11-12-55.png!
The process of removing and then returning back the restarting node is however 
itself tricky. And to answer why it’s non-trivial action, it’s necessary to 
reveal the main ideas of the rebalance protocol.

Reconfiguration of the raft group - is a process driven by the fact of changing 
the assignments. Each partition has three corresponding sets of assignments 
stored in the metastore:
 # assignments.stable - current distribution

 # assignments.pending - partition distribution for an ongoing rebalance if any

 # assignments.planned - in some cases it’s not possible to cancel or merge 
pending rebalance with new one. In that case newly calculated assignments will 
be stored explicitly with corresponding assignments.planned key. It's worth 
noting that it doesn't make sense to keep more than one planned rebalance. Any 
new scheduled one will overwrite already existing.

However such idea of overwriting the assignments.planned key wont work within 
the context of an in-memory raft restart, because it’s not valid to overwrite 
the reduction of assignments. Let's illustrate this problem with the following 
 # In-memory partition p1 is hosted on nodes A, B and C, meaning that 

 # Let's say that the baseline was changed, resulting in a rebalance on 

 # During the non-cancelable phase of [A,B,C]->[A,B,C,D], node C fails and 
returns back, meaning that we should plan [A,B,D] and [A,B,C,D] assignments. 
Both must be recorded in the only assignments.planned key meaning that 
[A,B,C,D] will overwrite reduction [A,B,D], so no actual raft reconfiguration 
will take place, which is not acceptable.

In order to overcome given issue, let’s introduce two new keys 
_assignments.switch.reduce_ that will hold nodes that should be removed and 
_assignments.switch.append_ that will hold nodes that should be returned back 
and run following actions:
h5. On in-memory partition restart (or on partition start with cleaned-up PDS)

within retry loop add current node to assignments.switch.reduce set:
do {
     retrievedAssignmentsSwitchReduce = 
     calculatedAssignmetnsSwitchReduce = 
union(retrievedAssignmentsSwitchReduce.value, currentNode);

     if (retrievedAssignmentsSwitchReduce.isEmpty()) {
         invokeRes = metastoreInvoke:
             if empty(assignments.switch.reduce)
                 assignments.switch.reduce = calculatedAssignmentsSwitchReduce 
     } else {
         invokeRes = metastoreInvoke:
                 assignments.switch.reduce = calculatedAssignmentsSwitchReduce 
} while (!invokeRes);{code}
h5. On assignments.switch.reduce change on corresponding partition leader

Within watch listener on assignments.switch.reduce key on corresponding 
partition leader we trigger new rebalance if there is no pending one.
calculatedAssignments = substract(calcPartAssighments(), 

    if empty(partition.assignments.change.trigger.revision) || 
partition.assignments.change.trigger.revision < event.revision
        if empty(assignments.pending)
            assignments.pending = calculatedAssignments
            partition.assignments.change.trigger.revision = event.revision
h5. On rebalance done

changePeers() calles onRebalanceDone closure with set of newPeers.
do {
    retrievedAssignments[Stable, Pending, SwitchReduce, SwitchAppend, Planned] 
= readRebalanceKeys();
    resolvedStable = resoveClusterNodes(newPeers);
    // Were reduced
    reducedNodes = substract(retrievedAssignmetnsSwitchReduce, resolvedStable);

    // Weere added
    addedNodes = substract(resolvedStable, retrievedAssignmentsStable);
    // For futher reduction
    calculatedAssignmetnsSwitchReduce = 
substact(retrievedAssignmetnsSwitchReduce, reducedNodes);

    // For futher addition
    calculatedAssignmetnsSwitchAppend = union(retrievedAssignmetnsSwitchAppend, 
    calculatedAssignmentsSwitchAppend = 
substact(calculatedAssignmentsSwitchAppend, addedNodes)
    calculatedAssignmetnsSwitchAppend = intersect(calcPartAssighments(), 

    calcualtedPedingReducion = substract(resolvedStable, 

    calculatedPendingAddition = union(resolvedStable, reducedNodes)
    calculatedPendingAddition = intersect(calcPartAssighments(), 
    // retry preconditions should be checked inside every invoke.
       eq(revision(assignemnts.stable), retrievedAssignmentsStable.revision) and
       eq(revision(assignemnts.pending), retrievedAssignmentsPeding.revision) 
retrievedAssignmentsSwitchReduce.revision) and
    // There are nodes that should be returned back
    if (!empty(calculatedAssignmetnsSwitchAppend)) {
        invokeRes = metastoreInvoke:
            if (<inline retryPreconditions>)
                 assignemnts.stable = resolvedStable
                 assignemnts.pending = calculatedPendingAddition
                 assignemnts.switch.reduce = calculatedAssignmetnsSwitchReduce
                 assignemnts.switch.append = calculatedAssignmetnsSwitchAppend
    } else if (!empty(calculatedAssignmetnsSwitchReduce) {
         invokeRes = metastoreInvoke:
             if (<inline retry precondions>)
                assignemnts.stable = resolvedStable
                assignemnts.pending = calculatedPendingReduction
                assignemnts.switch.reduce = calculatedAssignmetnsSwitchReduce
                assignemnts.switch.append = calculatedAssignmetnsSwitchAppend
    } else {
        if (<inline retryPreconditions>)
            // Common rebalance actions with moving planned to penidng if any...
            assignemnts.switch.append = calculatedAssignmetnsSwitchAppend
} while (!invokeRes);{code}
 It’s also possible to update onCommonRebalance in a similar way, however it’s 
only an optimization because onRebalanceDone will always consider switch as a 
higher priority action in comparison to planned rebalance triggered by baseline 
change or replica factor change.

Besides that, in many cases, especially for in-memory partitions, it is worth 
removing leaving node from raft group peers not within its restart but during 
corresponding networkTopology.onDisappeared() event. However It’s only an 
optimization. E.g. in the case of persisted table with cleaned up PDS it’s not 
known whether it was cleaned up until node self-check on table start up. Thus, 
the correctness of the algorithm is achieved due to the actions taken 
exclusively when the node is restarted - the rest is optimization.

All in all general flow on node restart will look like following:
private void onParitionStart(Partition p) {
        if (p.table().inMememory() || !p.localStorage().isAvailable()) {
            if (p.isMajorityAvailable()) {
                    <inline metaStorageInvoke*>
                    // Do not start corresponding raft server!
            } else if (fullParitionRestart) {
            } else {
                    // Do nothing.
        } else {
} {code}

If a node storing a partition of an in-memory table fails and leaves the 
cluster all data it had is lost. From the point of view of the partition it 
looks like as the node is left forever.

Although Raft protocol tolerates leaving some amount of nodes composing Raft 
group (partition); for in-memory caches we cannot restore replica factor 
because of in-memory nature of the table.

It means that we need to detect failures of each node owning a partition and 
recalculate assignments for the table without keeping replica factor.
h4. Upd 1:
h4. Problem

By design raft has several persisted segments, e.g. raft meta 
(term/committedIndex) and stable raft log. So, by converting common raft to 
in-memory one it’s possible to break some of it’s invariants. For example Node 
C could vote for Candidate A before self-restart and vote then for Candidate B 
after one. As a result two leaders will be elected which is illegal.
!Screenshot from 2022-04-19 11-11-05.png!
h4. Solution

In order to solve the problem mentioned above it’s possible to remove and then 
return back the restarting node from the peers of the corresponding raft group. 
The peer-removal process should be finished before the restarting of the 
corresponding raft server node.
  !Screenshot from 2022-04-19 11-12-55.png!
The process of removing and then returning back the restarting node is however 
itself tricky. And to answer why it’s non-trivial action, it’s necessary to 
reveal the main ideas of the rebalance protocol.

Reconfiguration of the raft group - is a process driven by the fact of changing 
the assignments. Each partition has three corresponding sets of assignments 
stored in the metastore:
 # assignments.stable - current distribution

 # assignments.pending - partition distribution for an ongoing rebalance if any

 # assignments.planned - in some cases it’s not possible to cancel or merge 
pending rebalance with new one. In that case newly calculated assignments will 
be stored explicitly with corresponding assignments.planned key. It's worth 
noting that it doesn't make sense to keep more than one planned rebalance. Any 
new scheduled one will overwrite already existing.

However such idea of overwriting the assignments.planned key wont work within 
the context of an in-memory raft restart, because it’s not valid to overwrite 
the reduction of assignments. Let's illustrate this problem with the following 
 # In-memory partition p1 is hosted on nodes A, B and C, meaning that 

 # Let's say that the baseline was changed, resulting in a rebalance on 

 # During the non-cancelable phase of [A,B,C]->[A,B,C,D], node C fails and 
returns back, meaning that we should plan [A,B,D] and [A,B,C,D] assignments. 
Both must be recorded in the only assignments.planned key meaning that 
[A,B,C,D] will overwrite reduction [A,B,D], so no actual raft reconfiguration 
will take place, which is not acceptable.

In order to overcome given issue, let’s introduce two new keys 
_assignments.switch.reduce_ that will hold nodes that should be removed and 
_assignments.switch.append_ that will hold nodes that should be returned back 
and run following actions:
h5. On in-memory partition restart (or on partition start with cleaned-up PDS)

within retry loop add current node to assignments.switch.reduce set:
do {
     retrievedAssignmentsSwitchReduce = 
     calculatedAssignmetnsSwitchReduce = 
union(retrievedAssignmentsSwitchReduce.value, currentNode);

     if (retrievedAssignmentsSwitchReduce.isEmpty()) {
         invokeRes = metastoreInvoke:
             if empty(assignments.switch.reduce)
                 assignments.switch.reduce = calculatedAssignmentsSwitchReduce 
     } else {
         invokeRes = metastoreInvoke:
                 assignments.switch.reduce = calculatedAssignmentsSwitchReduce 
} while (!invokeRes);{code}
h5. On assignments.switch.reduce change on corresponding partition leader

Within watch listener on assignments.switch.reduce key on corresponding 
partition leader we trigger new rebalance if there are no pending one.
calculatedAssignments = substract(calcPartAssighments(), 

    if empty(partition.assignments.change.trigger.revision) || 
partition.assignments.change.trigger.revision < event.revision
        if empty(assignments.pending)
            assignments.pending = calculatedAssignments
            partition.assignments.change.trigger.revision = event.revision
h5. On rebalance done

changePeers() calles onRebalanceDone closure with set of newPeers.
do {
    retrievedAssignments[Stable, Pending, SwitchReduce, SwitchAppend, Planned] 
= readRebalanceKeys();
    resolvedStable = resoveClusterNodes(newPeers);
    // Were reduced
    reducedNodes = substract(retrievedAssignmetnsSwitchReduce, resolvedStable);

    // Weere added
    addedNodes = substract(resolvedStable, retrievedAssignmentsStable);
    // For futher reduction
    calculatedAssignmetnsSwitchReduce = 
substact(retrievedAssignmetnsSwitchReduce, reducedNodes);

    // For futher addition
    calculatedAssignmetnsSwitchAppend = union(retrievedAssignmetnsSwitchAppend, 
    calculatedAssignmentsSwitchAppend = 
substact(calculatedAssignmentsSwitchAppend, addedNodes)
    calculatedAssignmetnsSwitchAppend = intersect(calcPartAssighments(), 

    calcualtedPedingReducion = substract(resolvedStable, 

    calculatedPendingAddition = union(resolvedStable, reducedNodes)
    calculatedPendingAddition = intersect(calcPartAssighments(), 
    // retry preconditions should be checked inside every invoke.
       eq(revision(assignemnts.stable), retrievedAssignmentsStable.revision) and
       eq(revision(assignemnts.pending), retrievedAssignmentsPeding.revision) 
retrievedAssignmentsSwitchReduce.revision) and
    // There are nodes that should be returned back
    if (!empty(calculatedAssignmetnsSwitchAppend)) {
        invokeRes = metastoreInvoke:
            if (<inline retryPreconditions>)
                 assignemnts.stable = resolvedStable
                 assignemnts.pending = calculatedPendingAddition
                 assignemnts.switch.reduce = calculatedAssignmetnsSwitchReduce
                 assignemnts.switch.append = calculatedAssignmetnsSwitchAppend
    } else if (!empty(calculatedAssignmetnsSwitchReduce) {
         invokeRes = metastoreInvoke:
             if (<inline retry precondions>)
                assignemnts.stable = resolvedStable
                assignemnts.pending = calculatedPendingReduction
                assignemnts.switch.reduce = calculatedAssignmetnsSwitchReduce
                assignemnts.switch.append = calculatedAssignmetnsSwitchAppend
    } else {
        if (<inline retryPreconditions>)
            // Common rebalance actions with moving planned to penidng if any...
            assignemnts.switch.append = calculatedAssignmetnsSwitchAppend
} while (!invokeRes);{code}
 It’s also possible to update onCommonRebalance in a similar way, however it’s 
only an optimization because onRebalanceDone will always consider switch as a 
higher priority action in comparison to planned rebalance triggered by baseline 
change or replica factor change.

Besides that, in many cases, especially for in-memory partitions, it is worth 
removing leaving node from raft group peers not within its restart but during 
corresponding networkTopology.onDisappeared() event. However It’s only an 
optimization. E.g. in the case of persisted table with cleaned up PDS it’s not 
known whether it was cleaned up until node self-check on table start up. Thus, 
the correctness of the algorithm is achieved due to the actions taken 
exclusively when the node is restarted - the rest is optimization.

All in all general flow on node restart will look like following:
private void onParitionStart(Partition p) {
        if (p.table().inMememory() || !p.localStorage().isAvailable()) {
            if (p.isMajorityAvailable()) {
                    <inline metaStorageInvoke*>
                    // Do not start corresponding raft server!
            } else if (fullParitionRestart) {
            } else {
                    // Do nothing.
        } else {
} {code}

> Design in-memory raft group reconfiguration on node failure
> -----------------------------------------------------------
>                 Key: IGNITE-16668
>                 URL: https://issues.apache.org/jira/browse/IGNITE-16668
>             Project: Ignite
>          Issue Type: Improvement
>            Reporter: Sergey Chugunov
>            Assignee: Alexander Lapin
>            Priority: Major
>              Labels: ignite-3
>         Attachments: Screenshot from 2022-04-19 11-11-05.png, Screenshot from 
> 2022-04-19 11-12-55-1.png, Screenshot from 2022-04-19 11-12-55.png
> If a node storing a partition of an in-memory table fails and leaves the 
> cluster all data it had is lost. From the point of view of the partition it 
> looks like as the node is left forever.
> Although Raft protocol tolerates leaving some amount of nodes composing Raft 
> group (partition); for in-memory caches we cannot restore replica factor 
> because of in-memory nature of the table.
> It means that we need to detect failures of each node owning a partition and 
> recalculate assignments for the table without keeping replica factor.
> h4. Upd 1:
> h4. Problem
> By design raft has several persisted segments, e.g. raft meta 
> (term/committedIndex) and stable raft log. So, by converting common raft to 
> in-memory one it’s possible to break some of it’s invariants. For example 
> Node C could vote for Candidate A before self-restart and vote then for 
> Candidate B after one. As a result two leaders will be elected which is 
> illegal.
> !Screenshot from 2022-04-19 11-11-05.png!
> h4. Solution
> In order to solve the problem mentioned above it’s possible to remove and 
> then return back the restarting node from the peers of the corresponding raft 
> group. The peer-removal process should be finished before the restarting of 
> the corresponding raft server node.
>   !Screenshot from 2022-04-19 11-12-55.png!
> The process of removing and then returning back the restarting node is 
> however itself tricky. And to answer why it’s non-trivial action, it’s 
> necessary to reveal the main ideas of the rebalance protocol.
> Reconfiguration of the raft group - is a process driven by the fact of 
> changing the assignments. Each partition has three corresponding sets of 
> assignments stored in the metastore:
>  # assignments.stable - current distribution
>  # assignments.pending - partition distribution for an ongoing rebalance if 
> any
>  # assignments.planned - in some cases it’s not possible to cancel or merge 
> pending rebalance with new one. In that case newly calculated assignments 
> will be stored explicitly with corresponding assignments.planned key. It's 
> worth noting that it doesn't make sense to keep more than one planned 
> rebalance. Any new scheduled one will overwrite already existing.
> However such idea of overwriting the assignments.planned key wont work within 
> the context of an in-memory raft restart, because it’s not valid to overwrite 
> the reduction of assignments. Let's illustrate this problem with the 
> following example.
>  # In-memory partition p1 is hosted on nodes A, B and C, meaning that 
> p1.assignments.stable=[A,B,C]
>  # Let's say that the baseline was changed, resulting in a rebalance on 
> assignments.pending=[A,B,C,*D*]
>  # During the non-cancelable phase of [A,B,C]->[A,B,C,D], node C fails and 
> returns back, meaning that we should plan [A,B,D] and [A,B,C,D] assignments. 
> Both must be recorded in the only assignments.planned key meaning that 
> [A,B,C,D] will overwrite reduction [A,B,D], so no actual raft reconfiguration 
> will take place, which is not acceptable.
> In order to overcome given issue, let’s introduce two new keys 
> _assignments.switch.reduce_ that will hold nodes that should be removed and 
> _assignments.switch.append_ that will hold nodes that should be returned back 
> and run following actions:
> h5. On in-memory partition restart (or on partition start with cleaned-up PDS)
> within retry loop add current node to assignments.switch.reduce set:
> {code:java}
> do {
>      retrievedAssignmentsSwitchReduce = 
> metastorage.read(assignments.switch.reduce);
>      calculatedAssignmetnsSwitchReduce = 
> union(retrievedAssignmentsSwitchReduce.value, currentNode);
>      if (retrievedAssignmentsSwitchReduce.isEmpty()) {
>          invokeRes = metastoreInvoke:
>              if empty(assignments.switch.reduce)
>                  assignments.switch.reduce = 
> calculatedAssignmentsSwitchReduce 
>      } else {
>          invokeRes = metastoreInvoke:
>              eq(revision(assignments.switch.reduce), 
> retrievedAssignmentsSwitchReduce.revision)
>                  assignments.switch.reduce = 
> calculatedAssignmentsSwitchReduce 
>      }
> } while (!invokeRes);{code}
> h5. On assignments.switch.reduce change on corresponding partition leader
> Within watch listener on assignments.switch.reduce key on corresponding 
> partition leader we trigger new rebalance if there is no pending one.
> {code:java}
> calculatedAssignments = substract(calcPartAssighments(), 
> assignments.switch.reduce);
> metastoreInvoke:
>     if empty(partition.assignments.change.trigger.revision) || 
> partition.assignments.change.trigger.revision < event.revision
>         if empty(assignments.pending)
>             assignments.pending = calculatedAssignments
>             partition.assignments.change.trigger.revision = event.revision
> {code}
> h5. On rebalance done
> changePeers() calles onRebalanceDone closure with set of newPeers.
> {code:java}
> do {
>     retrievedAssignments[Stable, Pending, SwitchReduce, SwitchAppend, 
> Planned] = readRebalanceKeys();
>     resolvedStable = resoveClusterNodes(newPeers);
>     // Were reduced
>     reducedNodes = substract(retrievedAssignmetnsSwitchReduce, 
> resolvedStable);
>     // Weere added
>     addedNodes = substract(resolvedStable, retrievedAssignmentsStable);
>     // For futher reduction
>     calculatedAssignmetnsSwitchReduce = 
> substact(retrievedAssignmetnsSwitchReduce, reducedNodes);
>     // For futher addition
>     calculatedAssignmetnsSwitchAppend = 
> union(retrievedAssignmetnsSwitchAppend, reducedNodes);
>     calculatedAssignmentsSwitchAppend = 
> substact(calculatedAssignmentsSwitchAppend, addedNodes)
>     calculatedAssignmetnsSwitchAppend = intersect(calcPartAssighments(), 
> calculatedAssignmentsSwitchAppend)
>     calcualtedPedingReducion = substract(resolvedStable, 
> retrievedAssignmetnsSwitchReduce);
>     calculatedPendingAddition = union(resolvedStable, reducedNodes)
>     calculatedPendingAddition = intersect(calcPartAssighments(), 
> calculatedPendingAddition)
>     // retry preconditions should be checked inside every invoke.
>     <retryPreconditions>
>        eq(revision(assignemnts.stable), retrievedAssignmentsStable.revision) 
> and
>        eq(revision(assignemnts.pending), retrievedAssignmentsPeding.revision) 
> and
>        eq(revision(assignemnts.switch.reduce), 
> retrievedAssignmentsSwitchReduce.revision) and
>        eq(revision(assignemnts.switch.append), 
> retrievedAssignmentsSwitchAppend.revision)
>     </retryPreconditions>
>     // There are nodes that should be returned back
>     if (!empty(calculatedAssignmetnsSwitchAppend)) {
>         invokeRes = metastoreInvoke:
>             if (<inline retryPreconditions>)
>                  assignemnts.stable = resolvedStable
>                  assignemnts.pending = calculatedPendingAddition
>                  assignemnts.switch.reduce = calculatedAssignmetnsSwitchReduce
>                  assignemnts.switch.append = calculatedAssignmetnsSwitchAppend
>     } else if (!empty(calculatedAssignmetnsSwitchReduce) {
>          invokeRes = metastoreInvoke:
>              if (<inline retry precondions>)
>                 assignemnts.stable = resolvedStable
>                 assignemnts.pending = calculatedPendingReduction
>                 assignemnts.switch.reduce = calculatedAssignmetnsSwitchReduce
>                 assignemnts.switch.append = calculatedAssignmetnsSwitchAppend
>     } else {
>         if (<inline retryPreconditions>)
>             // Common rebalance actions with moving planned to penidng if 
> any...
>             + 
>             assignemnts.switch.append = calculatedAssignmetnsSwitchAppend
>      }
> } while (!invokeRes);{code}
>  It’s also possible to update onCommonRebalance in a similar way, however 
> it’s only an optimization because onRebalanceDone will always consider switch 
> as a higher priority action in comparison to planned rebalance triggered by 
> baseline change or replica factor change.
> Besides that, in many cases, especially for in-memory partitions, it is worth 
> removing leaving node from raft group peers not within its restart but during 
> corresponding networkTopology.onDisappeared() event. However It’s only an 
> optimization. E.g. in the case of persisted table with cleaned up PDS it’s 
> not known whether it was cleaned up until node self-check on table start up. 
> Thus, the correctness of the algorithm is achieved due to the actions taken 
> exclusively when the node is restarted - the rest is optimization.
> All in all general flow on node restart will look like following:
> {code:java}
> private void onParitionStart(Partition p) {
>         if (p.table().inMememory() || !p.localStorage().isAvailable()) {
>             if (p.isMajorityAvailable()) {
>                     <inline metaStorageInvoke*>
>                     // Do not start corresponding raft server!
>             } else if (fullParitionRestart) {
>                     commonStart();
>             } else {
>                     // Do nothing.
>             }
>         } else {
>             commonStart();
>         }
> } {code}

This message was sent by Atlassian Jira

Reply via email to