This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0280dc5446 IGNITE-23870 Implement a method to receive a chain of 
topology changes (#4966)
0280dc5446 is described below

commit 0280dc5446fafd63e067d78c0901369f2d235a6f
Author: Cyrill <[email protected]>
AuthorDate: Mon Dec 30 12:35:21 2024 +0300

    IGNITE-23870 Implement a method to receive a chain of topology changes 
(#4966)
---
 .../RebalanceRaftGroupEventsListener.java          |  96 ++++-
 .../distributionzones/rebalance/RebalanceUtil.java |  15 +
 .../partitiondistribution/AssignmentsChain.java    | 129 +++++++
 .../AssignmentsChainSerializer.java                |  55 +++
 .../AssignmentsChainSerializerTest.java            | 122 ++++++
 .../ItDisasterRecoveryReconfigurationTest.java     | 414 ++++++++++++++++++++-
 .../internal/table/distributed/TableManager.java   |  19 +-
 .../table/distributed/TableManagerTest.java        |   6 +-
 8 files changed, 841 insertions(+), 15 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
index def5a4589d..e68c0b89f3 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.distributionzones.rebalance;
 
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.assignmentsChainKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
@@ -50,10 +51,13 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
 import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
 import org.apache.ignite.internal.metastorage.dsl.Update;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
+import org.apache.ignite.internal.partitiondistribution.AssignmentsChain;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftError;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
@@ -270,6 +274,7 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
             ByteArray plannedPartAssignmentsKey = 
plannedPartAssignmentsKey(tablePartitionId);
             ByteArray switchReduceKey = switchReduceKey(tablePartitionId);
             ByteArray switchAppendKey = switchAppendKey(tablePartitionId);
+            ByteArray assignmentsChainKey = 
assignmentsChainKey(tablePartitionId);
 
             // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove 
synchronous wait
             Map<ByteArray, Entry> values = metaStorageMgr.getAll(
@@ -278,7 +283,8 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
                             pendingPartAssignmentsKey,
                             stablePartAssignmentsKey,
                             switchReduceKey,
-                            switchAppendKey
+                            switchAppendKey,
+                            assignmentsChainKey
                     )
             ).get();
 
@@ -287,6 +293,7 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
             Entry plannedEntry = values.get(plannedPartAssignmentsKey);
             Entry switchReduceEntry = values.get(switchReduceKey);
             Entry switchAppendEntry = values.get(switchAppendKey);
+            Entry assignmentsChainEntry = values.get(assignmentsChainKey);
 
             Set<Assignment> retrievedStable = 
readAssignments(stableEntry).nodes();
             Set<Assignment> retrievedSwitchReduce = 
readAssignments(switchReduceEntry).nodes();
@@ -341,12 +348,21 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
             // All conditions combined with AND operator.
             Condition retryPreconditions = and(con1, and(con2, and(con3, 
con4)));
 
+            long catalogTimestamp = pendingAssignments.timestamp();
+
+            Assignments newStableAssignments = Assignments.of(stableFromRaft, 
catalogTimestamp);
+
+            Operation assignmentChainChangeOp = handleAssignmentsChainChange(
+                    assignmentsChainKey,
+                    assignmentsChainEntry,
+                    pendingAssignments,
+                    newStableAssignments
+            );
+
             Update successCase;
             Update failCase;
 
-            long catalogTimestamp = pendingAssignments.timestamp();
-
-            byte[] stableFromRaftByteArray = 
Assignments.toBytes(stableFromRaft, catalogTimestamp);
+            byte[] stableFromRaftByteArray = newStableAssignments.toBytes();
             byte[] additionByteArray = 
Assignments.toBytes(calculatedPendingAddition, catalogTimestamp);
             byte[] reductionByteArray = 
Assignments.toBytes(calculatedPendingReduction, catalogTimestamp);
             byte[] switchReduceByteArray = 
Assignments.toBytes(calculatedSwitchReduce, catalogTimestamp);
@@ -357,7 +373,8 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
                         put(stablePartAssignmentsKey, stableFromRaftByteArray),
                         put(pendingPartAssignmentsKey, additionByteArray),
                         put(switchReduceKey, switchReduceByteArray),
-                        put(switchAppendKey, switchAppendByteArray)
+                        put(switchAppendKey, switchAppendByteArray),
+                        assignmentChainChangeOp
                 ).yield(SWITCH_APPEND_SUCCESS);
                 failCase = ops().yield(SWITCH_APPEND_FAIL);
             } else if (!calculatedSwitchReduce.isEmpty()) {
@@ -365,7 +382,8 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
                         put(stablePartAssignmentsKey, stableFromRaftByteArray),
                         put(pendingPartAssignmentsKey, reductionByteArray),
                         put(switchReduceKey, switchReduceByteArray),
-                        put(switchAppendKey, switchAppendByteArray)
+                        put(switchAppendKey, switchAppendByteArray),
+                        assignmentChainChangeOp
                 ).yield(SWITCH_REDUCE_SUCCESS);
                 failCase = ops().yield(SWITCH_REDUCE_FAIL);
             } else {
@@ -377,7 +395,8 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
                     successCase = ops(
                             put(stablePartAssignmentsKey, 
stableFromRaftByteArray),
                             put(pendingPartAssignmentsKey, 
plannedEntry.value()),
-                            remove(plannedPartAssignmentsKey)
+                            remove(plannedPartAssignmentsKey),
+                            assignmentChainChangeOp
                     ).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);
 
                     failCase = ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
@@ -387,7 +406,8 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
 
                     successCase = ops(
                             put(stablePartAssignmentsKey, 
stableFromRaftByteArray),
-                            remove(pendingPartAssignmentsKey)
+                            remove(pendingPartAssignmentsKey),
+                            assignmentChainChangeOp
                     ).yield(FINISH_REBALANCE_SUCCESS);
 
                     failCase = ops().yield(FINISH_REBALANCE_FAIL);
@@ -469,6 +489,66 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
         }
     }
 
+    private static Operation handleAssignmentsChainChange(
+            ByteArray assignmentsChainKey,
+            Entry assignmentsChainEntry,
+            Assignments pendingAssignments,
+            Assignments stableAssignments
+    ) {
+        // We write this key only in HA mode. See 
TableManager.writeTableAssignmentsToMetastore.
+        if (assignmentsChainEntry.value() != null) {
+            AssignmentsChain updatedAssignmentsChain = updateAssignmentsChain(
+                    AssignmentsChain.fromBytes(assignmentsChainEntry.value()),
+                    stableAssignments,
+                    pendingAssignments
+            );
+            return put(assignmentsChainKey, updatedAssignmentsChain.toBytes());
+        } else {
+            return Operations.noop();
+        }
+    }
+
+    private static AssignmentsChain updateAssignmentsChain(AssignmentsChain 
assignmentsChain, Assignments newStable,
+            Assignments pendingAssignments) {
+        assert assignmentsChain != null : "Assignments chain cannot be null in 
HA mode.";
+
+        assert !assignmentsChain.chain().isEmpty() : "Assignments chain cannot 
be empty on stable switch.";
+
+        /*
+            This method covers the following case:
+
+            stable = [A,B,C,D,E,F,G]
+            lost A B C D
+
+            stable = [E] pending = [E F G]
+            stable = [E F G]
+
+            E F lost
+            stable = [G]
+
+            on node G restart
+            ms.chain = [A,B,C,D,E,F,G] -> [E,F,G] -> [G]
+
+          on doStableKeySwitch
+              if !pending.force && !pending.secondPhaseOfReset
+                ms.chain = pending/stable // [A,B,C,D,E,F,G]
+              else if !pending.force && pending.secondPhaseOfReset
+                ms.chain.last = stable // [A,B,C,D,E,F,G] -> [E] => 
[A,B,C,D,E,F,G] -> [E F G]
+              else
+                ms.chain = ms.chain + pending/stable // [A,B,C,D,E,F,G] => 
[A,B,C,D,E,F,G] -> [E]
+        */
+        AssignmentsChain newAssignmentsChain;
+        if (!pendingAssignments.force() && !pendingAssignments.fromReset()) {
+            newAssignmentsChain = AssignmentsChain.of(newStable);
+        } else if (!pendingAssignments.force() && 
pendingAssignments.fromReset()) {
+            newAssignmentsChain = assignmentsChain.replaceLast(newStable);
+        } else {
+            newAssignmentsChain = assignmentsChain.addLast(newStable);
+        }
+
+        return newAssignmentsChain;
+    }
+
     /**
      * Creates a set of assignments from the given set of peers and learners.
      */
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 13b1646557..e90a6d0ef5 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -446,6 +446,10 @@ public class RebalanceUtil {
 
     public static final byte[] PENDING_CHANGE_TRIGGER_PREFIX_BYTES = 
PENDING_CHANGE_TRIGGER_PREFIX.getBytes(UTF_8);
 
+    public static final String ASSIGNMENTS_CHAIN_PREFIX = "assignments.chain.";
+
+    public static final byte[] ASSIGNMENTS_CHAIN_PREFIX_BYTES = 
ASSIGNMENTS_CHAIN_PREFIX.getBytes(UTF_8);
+
     /**
      * Key that is needed for skipping stale events of pending key change.
      *
@@ -490,6 +494,17 @@ public class RebalanceUtil {
         return new ByteArray(STABLE_ASSIGNMENTS_PREFIX + partId);
     }
 
+    /**
+     * Key for the graceful restart in HA mode.
+     *
+     * @param partId Unique identifier of a partition.
+     * @return Key for a partition.
+     * @see <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-131%3A+Partition+Majority+Unavailability+Handling";>HA
 mode</a>
+     */
+    public static ByteArray assignmentsChainKey(TablePartitionId partId) {
+        return new ByteArray(ASSIGNMENTS_CHAIN_PREFIX + partId);
+    }
+
     /**
      * Key that is needed for the rebalance algorithm.
      *
diff --git 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChain.java
 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChain.java
new file mode 100644
index 0000000000..b417c2f0b2
--- /dev/null
+++ 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChain.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partitiondistribution;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.jetbrains.annotations.Contract;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains the chain of changed assignments.
+ */
+public class AssignmentsChain {
+    /** Chain of assignments. */
+    @IgniteToStringInclude
+    private final List<Assignments> chain;
+
+    private AssignmentsChain(List<Assignments> chain) {
+        this.chain = chain;
+    }
+
+    public List<Assignments> chain() {
+        return chain;
+    }
+
+    /**
+     * Create a new {@link AssignmentsChain} with the last link in the chain 
replaced with the provided one.
+     *
+     * @param newLast New last link.
+     * @return new AssignmentsChain.
+     */
+    public AssignmentsChain replaceLast(Assignments newLast) {
+        assert !chain.isEmpty() : "Assignments chain is empty.";
+
+        List<Assignments> newChain = new ArrayList<>(chain);
+
+        newChain.set(newChain.size() - 1, newLast);
+
+        return new AssignmentsChain(newChain);
+    }
+
+    /**
+     * Create a new {@link AssignmentsChain} with a new link added to the 
chain.
+     *
+     * @param newLast New last link.
+     * @return new AssignmentsChain.
+     */
+    public AssignmentsChain addLast(Assignments newLast) {
+        assert !chain.isEmpty() : "Assignments chain is empty.";
+
+        List<Assignments> newChain = new ArrayList<>(chain);
+
+        newChain.add(newLast);
+
+        return new AssignmentsChain(newChain);
+    }
+
+    /**
+     * Creates a new instance.
+     *
+     * @param assignments Partition assignments.
+     */
+    public static AssignmentsChain of(Assignments assignments) {
+        return new AssignmentsChain(List.of(assignments));
+    }
+
+    /**
+     * Creates a new instance.
+     *
+     * @param assignmentsChain Chain of partition assignments.
+     */
+    public static AssignmentsChain of(List<Assignments> assignmentsChain) {
+        return new AssignmentsChain(assignmentsChain);
+    }
+
+    public byte[] toBytes() {
+        return VersionedSerialization.toBytes(this, 
AssignmentsChainSerializer.INSTANCE);
+    }
+
+    /**
+     * Deserializes assignments from the array of bytes. Returns {@code null} 
if the argument is {@code null}.
+     */
+    @Nullable
+    @Contract("null -> null; !null -> !null")
+    public static AssignmentsChain fromBytes(byte @Nullable [] bytes) {
+        return bytes == null ? null : VersionedSerialization.fromBytes(bytes, 
AssignmentsChainSerializer.INSTANCE);
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        AssignmentsChain that = (AssignmentsChain) o;
+
+        return Objects.equals(chain, that.chain);
+    }
+
+    @Override
+    public int hashCode() {
+        return chain.hashCode();
+    }
+
+}
diff --git 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChainSerializer.java
 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChainSerializer.java
new file mode 100644
index 0000000000..9538ec2686
--- /dev/null
+++ 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChainSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partitiondistribution;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+
+/**
+ * {@link VersionedSerializer} for {@link AssignmentsChain} instances.
+ */
+public class AssignmentsChainSerializer extends 
VersionedSerializer<AssignmentsChain> {
+
+    /** Serializer instance. */
+    public static final AssignmentsChainSerializer INSTANCE = new 
AssignmentsChainSerializer();
+
+    @Override
+    protected void writeExternalData(AssignmentsChain chain, IgniteDataOutput 
out) throws IOException {
+        out.writeVarInt(chain.chain().size());
+
+        for (Assignments assignment : chain.chain()) {
+            AssignmentsSerializer.INSTANCE.writeExternal(assignment, out);
+        }
+    }
+
+    @Override
+    protected AssignmentsChain readExternalData(byte protoVer, IgniteDataInput 
in) throws IOException {
+        int length = in.readVarIntAsInt();
+        List<Assignments> assignmentsChain = new ArrayList<>(length);
+
+        for (int i = 0; i < length; i++) {
+            
assignmentsChain.add(AssignmentsSerializer.INSTANCE.readExternal(in));
+        }
+        return AssignmentsChain.of(assignmentsChain);
+    }
+}
diff --git 
a/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChainSerializerTest.java
 
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChainSerializerTest.java
new file mode 100644
index 0000000000..9a102c9e13
--- /dev/null
+++ 
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChainSerializerTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partitiondistribution;
+
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+import java.time.LocalDateTime;
+import java.time.Month;
+import java.time.ZoneOffset;
+import java.util.Base64;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
+
+class AssignmentsChainSerializerTest {
+    private static final String ASSIGNMENTS_CHAIN_SERIALIZED_WITH_V1 = 
"Ae++QwMB775DAwRhYmMBBGRlZgAAUcKMAQD0BgAB775DAgRkZWYAAFHCjAEA9AYA";
+
+    private final AssignmentsChainSerializer serializer = new 
AssignmentsChainSerializer();
+
+    private static final long BASE_PHYSICAL_TIME = LocalDateTime.of(2024, 
Month.JANUARY, 1, 0, 0)
+            .atOffset(ZoneOffset.UTC)
+            .toInstant()
+            .toEpochMilli();
+
+    private static long baseTimestamp(int logical) {
+        return new HybridTimestamp(BASE_PHYSICAL_TIME, logical).longValue();
+    }
+
+    @CartesianTest
+    void serializationAndDeserialization(
+            @Values(booleans = {false, true}) boolean force,
+            @Values(booleans = {false, true}) boolean fromReset
+    ) {
+        AssignmentsChain originalAssignmentsChain =
+                AssignmentsChain.of(List.of(testAssignments1(force, 
fromReset), testAssignments2(force, fromReset)));
+
+        byte[] bytes = 
VersionedSerialization.toBytes(originalAssignmentsChain, serializer);
+        AssignmentsChain restoredAssignmentsChain = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredAssignmentsChain, 
equalTo(originalAssignmentsChain));
+    }
+
+    @Test
+    void v1CanBeDeserialized() {
+        byte[] bytes = 
Base64.getDecoder().decode(ASSIGNMENTS_CHAIN_SERIALIZED_WITH_V1);
+        AssignmentsChain restoredAssignmentsChain = 
VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertChainFromV1(restoredAssignmentsChain);
+    }
+
+    private static void assertChainFromV1(AssignmentsChain restoredChain) {
+        assertThat(restoredChain.chain(), hasSize(2));
+        assertNodes1FromV1(restoredChain.chain().get(0));
+        assertNodes2FromV1(restoredChain.chain().get(1));
+    }
+
+    private static void assertNodes1FromV1(Assignments restoredAssignments) {
+        assertThat(restoredAssignments.nodes(), hasSize(2));
+        List<Assignment> orderedNodes = restoredAssignments.nodes().stream()
+                .sorted(comparing(Assignment::consistentId))
+                .collect(toList());
+
+        Assignment assignment1 = orderedNodes.get(0);
+        assertThat(assignment1.consistentId(), is("abc"));
+        assertThat(assignment1.isPeer(), is(true));
+
+        Assignment assignment2 = orderedNodes.get(1);
+        assertThat(assignment2.consistentId(), is("def"));
+        assertThat(assignment2.isPeer(), is(false));
+    }
+
+    private static void assertNodes2FromV1(Assignments restoredAssignments) {
+        assertThat(restoredAssignments.nodes(), hasSize(1));
+        List<Assignment> orderedNodes = restoredAssignments.nodes().stream()
+                .sorted(comparing(Assignment::consistentId))
+                .collect(toList());
+
+        Assignment assignment1 = orderedNodes.get(0);
+        assertThat(assignment1.consistentId(), is("def"));
+        assertThat(assignment1.isPeer(), is(false));
+    }
+
+    private static Assignments testAssignments1(boolean force, boolean 
fromReset) {
+        Set<Assignment> nodes = Set.of(Assignment.forPeer("abc"), 
Assignment.forLearner("def"));
+
+        return force
+                ? Assignments.forced(nodes, baseTimestamp(5))
+                : Assignments.of(nodes, baseTimestamp(5), fromReset);
+    }
+
+    private static Assignments testAssignments2(boolean force, boolean 
fromReset) {
+        Set<Assignment> nodes = Set.of(Assignment.forLearner("def"));
+
+        return force
+                ? Assignments.forced(nodes, baseTimestamp(5))
+                : Assignments.of(nodes, baseTimestamp(5), fromReset);
+    }
+}
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 75b4b23ae3..22f24c008e 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -28,6 +28,8 @@ import static 
org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableManager;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.assignmentsChainKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
@@ -59,10 +61,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -72,6 +76,8 @@ import 
org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.TestWrappers;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
+import 
org.apache.ignite.internal.configuration.SystemDistributedExtensionConfiguration;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.RunnableX;
@@ -86,6 +92,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPar
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
+import org.apache.ignite.internal.partitiondistribution.AssignmentsChain;
 import 
org.apache.ignite.internal.partitiondistribution.RendezvousDistributionFunction;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.raft.Peer;
@@ -172,8 +179,10 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
         // startNodesInParallel(IntStream.range(INITIAL_NODES, 
zoneParams.nodes()).toArray());
 
         executeSql(format("CREATE ZONE %s with replicas=%d, partitions=%d,"
-                        + " data_nodes_auto_adjust_scale_down=%d, 
data_nodes_auto_adjust_scale_up=%d, storage_profiles='%s'",
-                zoneName, zoneParams.replicas(), zoneParams.partitions(), 
SCALE_DOWN_TIMEOUT_SECONDS, 1, DEFAULT_STORAGE_PROFILE
+                        + " data_nodes_auto_adjust_scale_down=%d, 
data_nodes_auto_adjust_scale_up=%d, storage_profiles='%s',"
+                        + " consistency_mode='%s'",
+                zoneName, zoneParams.replicas(), zoneParams.partitions(), 
SCALE_DOWN_TIMEOUT_SECONDS, 1, DEFAULT_STORAGE_PROFILE,
+                zoneParams.consistencyMode().name()
         ));
 
         CatalogZoneDescriptor zone = node0.catalogManager().zone(zoneName, 
node0.clock().nowLong());
@@ -957,6 +966,8 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
 
         assertStableAssignments(node0, partId, allAssignments);
 
+        assertAssignmentsChain(node0, partId, null);
+
         // Write data(1) to all seven nodes.
         List<Throwable> errors = insertValues(table, partId, 0);
         assertThat(errors, is(empty()));
@@ -1234,6 +1245,378 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         assertPlannedAssignments(node0, partId, assignments13);
     }
 
+    @Test
+    @ZoneParams(nodes = 7, replicas = 3, partitions = 1, consistencyMode = 
ConsistencyMode.HIGH_AVAILABILITY)
+    void testAssignmentsChainUpdate() throws Exception {
+        int partId = 0;
+
+        IgniteImpl node0 = igniteImpl(0);
+        int catalogVersion = node0.catalogManager().latestCatalogVersion();
+        long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+        Table table = node0.tables().table(TABLE_NAME);
+
+        awaitPrimaryReplica(node0, partId);
+
+        assertRealAssignments(node0, partId, 0, 2, 3);
+
+        Assignments initialAssignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(2).name()),
+                Assignment.forPeer(node(3).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, initialAssignments);
+
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(initialAssignments));
+
+        // Write data(1) to all nodes.
+        List<Throwable> errors = insertValues(table, partId, 0);
+        assertThat(errors, is(empty()));
+
+        logger().info("Stopping nodes [ids={}].", 3);
+
+        stopNode(3);
+
+        logger().info("Stopped nodes [ids={}].", 3);
+
+        Assignments link2Assignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(2).name())
+        ), timestamp);
+
+        assertRealAssignments(node0, partId, 0, 1, 2);
+
+        assertStableAssignments(node0, partId, link2Assignments, 30_000);
+
+        // Graceful change should reinit the assignments chain, in other words 
there should be only one link
+        // in the chain - the current stable assignments.
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(link2Assignments)));
+
+        // Disable scale down to avoid unwanted rebalance.
+        executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+        // Disable automatic rebalance since we want to restore replica factor.
+        setDistributionResetTimeout(node0, INFINITE_TIMER_VALUE);
+
+        // Now stop the majority and the automatic reset should kick in.
+        logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{1, 
2}));
+
+        Assignments resetAssignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(4).name()),
+                Assignment.forPeer(node(5).name())
+        ), timestamp);
+
+        AtomicBoolean blockedLink = new AtomicBoolean(true);
+
+        // Block stable switch to check that we initially add reset phase 1 
assignments to the chain.
+        blockMessage((nodeName, msg) -> blockedLink.get() && 
stableKeySwitchMessage(msg, partId, resetAssignments));
+
+        stopNodesInParallel(1, 2);
+
+        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetAllPartitions(
+                zoneName,
+                QUALIFIED_TABLE_NAME,
+                true,
+                -1
+        );
+
+        assertThat(updateFuture, willCompleteSuccessfully());
+
+        Assignments linkFirstPhaseReset = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, linkFirstPhaseReset, 60_000);
+
+        // Assignments chain consists of stable and the first phase of reset.
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(link2Assignments, linkFirstPhaseReset)));
+
+        // Unblock stable switch, wait for reset phase 2 assignments to 
replace phase 1 assignments in the chain.
+        blockedLink.set(false);
+
+        logger().info("Unblocked stable switch.");
+
+        assertRealAssignments(node0, partId, 0, 4, 5);
+
+        assertStableAssignments(node0, partId, resetAssignments, 60_000);
+
+        // Assignments chain consists of stable and the second phase of reset.
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(link2Assignments, resetAssignments)));
+    }
+
+    @Test
+    @ZoneParams(nodes = 7, replicas = 7, partitions = 1, consistencyMode = 
ConsistencyMode.HIGH_AVAILABILITY)
+    void testAssignmentsChainUpdatedOnAutomaticReset() throws Exception {
+        int partId = 0;
+
+        IgniteImpl node0 = igniteImpl(0);
+        int catalogVersion = node0.catalogManager().latestCatalogVersion();
+        long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+        Table table = node0.tables().table(TABLE_NAME);
+
+        awaitPrimaryReplica(node0, partId);
+
+        // Disable scale down to avoid unwanted rebalance.
+        executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+        assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
+
+        Assignments allAssignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(2).name()),
+                Assignment.forPeer(node(3).name()),
+                Assignment.forPeer(node(4).name()),
+                Assignment.forPeer(node(5).name()),
+                Assignment.forPeer(node(6).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, allAssignments);
+
+        // Assignments chain is equal to the stable assignments.
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(allAssignments));
+
+        // Write data(1) to all nodes.
+        List<Throwable> errors = insertValues(table, partId, 0);
+        assertThat(errors, is(empty()));
+
+        Assignments link2Assignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(2).name())
+        ), timestamp);
+
+        AtomicBoolean blockedLink2 = new AtomicBoolean(true);
+
+        // Block stable switch to check that we initially add reset phase 1 
assignments to the chain.
+        blockMessage((nodeName, msg) -> blockedLink2.get() && 
stableKeySwitchMessage(msg, partId, link2Assignments));
+
+        logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{3, 
4, 5, 6}));
+
+        stopNodesInParallel(3, 4, 5, 6);
+
+        Assignments link2FirstPhaseReset = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, link2FirstPhaseReset, 60_000);
+
+        // Assignments chain consists of stable and the first phase of reset.
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(allAssignments, link2FirstPhaseReset)));
+
+        // Unblock stable switch, wait for reset phase 2 assignments to 
replace phase 1 assignments in the chain.
+        blockedLink2.set(false);
+
+        assertStableAssignments(node0, partId, link2Assignments, 30_000);
+
+        // Assignments chain consists of stable and the second phase of reset.
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(allAssignments, link2Assignments)));
+
+        logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{1, 
2}));
+        stopNodesInParallel(1, 2);
+
+        Assignments link3Assignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, link3Assignments, 30_000);
+
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(allAssignments, link2Assignments, 
link3Assignments)));
+    }
+
+    @Test
+    @ZoneParams(nodes = 7, replicas = 7, partitions = 1, consistencyMode = 
ConsistencyMode.HIGH_AVAILABILITY)
+    void testSecondResetRewritesUnfinishedFirstPhaseReset() throws Exception {
+        int partId = 0;
+
+        IgniteImpl node0 = igniteImpl(0);
+        int catalogVersion = node0.catalogManager().latestCatalogVersion();
+        long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+        Table table = node0.tables().table(TABLE_NAME);
+
+        awaitPrimaryReplica(node0, partId);
+
+        // Disable automatic reset since we want to check manual ones.
+        setDistributionResetTimeout(node0, INFINITE_TIMER_VALUE);
+        // Disable scale down to avoid unwanted rebalance.
+        executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+        assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
+
+        Assignments allAssignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(2).name()),
+                Assignment.forPeer(node(3).name()),
+                Assignment.forPeer(node(4).name()),
+                Assignment.forPeer(node(5).name()),
+                Assignment.forPeer(node(6).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, allAssignments);
+
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(allAssignments));
+
+        // Write data(1) to all seven nodes.
+        List<Throwable> errors = insertValues(table, partId, 0);
+        assertThat(errors, is(empty()));
+
+        Assignments blockedRebalance = Assignments.of(timestamp,
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(2).name())
+        );
+
+        blockRebalanceStableSwitch(partId, blockedRebalance);
+
+        logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{3, 
4, 5, 6}));
+        stopNodesInParallel(3, 4, 5, 6);
+
+        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetAllPartitions(
+                zoneName,
+                QUALIFIED_TABLE_NAME,
+                true,
+                -1
+        );
+
+        assertThat(updateFuture, willCompleteSuccessfully());
+
+        // First phase of reset. The second phase stable switch is blocked.
+        Assignments link2Assignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, link2Assignments, 30_000);
+
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(allAssignments, link2Assignments)));
+
+        Assignments assignmentsPending = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(2).name())
+        ), timestamp, true);
+
+        assertPendingAssignments(node0, partId, assignmentsPending);
+
+        logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{ 
2}));
+        stopNode(2);
+
+        CompletableFuture<?> updateFuture2 = 
node0.disasterRecoveryManager().resetAllPartitions(
+                zoneName,
+                QUALIFIED_TABLE_NAME,
+                true,
+                -1
+        );
+
+        assertThat(updateFuture2, willCompleteSuccessfully());
+
+        Assignments link3Assignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, link3Assignments, 30_000);
+
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(allAssignments, link2Assignments, 
link3Assignments)));
+    }
+
+    @Test
+    @ZoneParams(nodes = 6, replicas = 3, partitions = 1, consistencyMode = 
ConsistencyMode.HIGH_AVAILABILITY)
+    void testGracefulRewritesChainAfterForceReset() throws Exception {
+        int partId = 0;
+
+        IgniteImpl node0 = igniteImpl(0);
+        int catalogVersion = node0.catalogManager().latestCatalogVersion();
+        long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+        Table table = node0.tables().table(TABLE_NAME);
+
+        awaitPrimaryReplica(node0, partId);
+
+        // Disable automatic reset since we want to check manual ones.
+        setDistributionResetTimeout(node0, INFINITE_TIMER_VALUE);
+        // Disable scale down to avoid unwanted rebalance.
+        executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+        assertRealAssignments(node0, partId, 2, 3, 5);
+
+        Assignments initialAssignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(2).name()),
+                Assignment.forPeer(node(3).name()),
+                Assignment.forPeer(node(5).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, initialAssignments);
+
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(initialAssignments));
+
+        // Write data(1) to all nodes.
+        List<Throwable> errors = insertValues(table, partId, 0);
+        assertThat(errors, is(empty()));
+
+        logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{2, 
3}));
+
+        stopNodesInParallel(2, 3);
+
+        logger().info("Stopped nodes [ids={}].", Arrays.toString(new int[]{2, 
3}));
+
+        CompletableFuture<?> updateFuture2 = 
node0.disasterRecoveryManager().resetAllPartitions(
+                zoneName,
+                QUALIFIED_TABLE_NAME,
+                true,
+                -1
+        );
+
+        assertThat(updateFuture2, willCompleteSuccessfully());
+
+        Assignments link2Assignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(5).name())
+        ), timestamp);
+
+        assertRealAssignments(node0, partId, 0, 1, 5);
+
+        assertStableAssignments(node0, partId, link2Assignments, 30_000);
+
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(initialAssignments, link2Assignments)));
+
+        // Return back scale down.
+        executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", zoneName, 1));
+
+        // Now stop one and check graceful rebalance.
+        logger().info("Stopping nodes [ids={}].", 1);
+
+        stopNode(1);
+
+        Assignments finalAssignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(4).name()),
+                Assignment.forPeer(node(5).name())
+        ), timestamp);
+
+        assertRealAssignments(node0, partId, 0, 4, 5);
+
+        assertStableAssignments(node0, partId, finalAssignments, 30_000);
+
+        // Graceful change should reinit the assignments chain, in other words 
there should be only one link
+        // in the chain - the current stable assignments.
+        assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(List.of(finalAssignments)));
+    }
+
+    private void setDistributionResetTimeout(IgniteImpl node, long timeout) {
+        CompletableFuture<Void> changeFuture = node
+                .clusterConfiguration()
+                .getConfiguration(SystemDistributedExtensionConfiguration.KEY)
+                .system().change(c0 -> c0.changeProperties()
+                        .createOrUpdate(PARTITION_DISTRIBUTION_RESET_TIMEOUT,
+                                c1 -> 
c1.changePropertyValue(String.valueOf(timeout)))
+                );
+
+        assertThat(changeFuture, willCompleteSuccessfully());
+    }
+
     private static String getPendingNodeName(List<String> aliveNodes, String 
blockedNode) {
         List<String> candidates = new ArrayList<>(aliveNodes);
         candidates.remove(blockedNode);
@@ -1403,12 +1786,24 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
     }
 
     private void assertStableAssignments(IgniteImpl node0, int partId, 
Assignments expected) throws InterruptedException {
+        assertStableAssignments(node0, partId, expected, 2000);
+    }
+
+    private void assertStableAssignments(IgniteImpl node0, int partId, 
Assignments expected, long timeoutMillis)
+            throws InterruptedException {
         assertTrue(
-                waitForCondition(() -> 
expected.equals(getStableAssignments(node0, partId)), 2000),
+                waitForCondition(() -> 
expected.equals(getStableAssignments(node0, partId)), timeoutMillis),
                 () -> "Expected: " + expected + ", actual: " + 
getStableAssignments(node0, partId)
         );
     }
 
+    private void assertAssignmentsChain(IgniteImpl node0, int partId, 
@Nullable AssignmentsChain expected) throws InterruptedException {
+        assertTrue(
+                waitForCondition(() -> Objects.equals(expected, 
getAssignmentsChain(node0, partId)), 2000),
+                () -> "Expected: " + expected + ", actual: " + 
getAssignmentsChain(node0, partId)
+        );
+    }
+
     /**
      * Inserts {@value ENTRIES} values into a table, expecting either a 
success or specific set of exceptions that would indicate
      * replication issues. Collects such exceptions into a list and returns. 
Fails if unexpected exception happened.
@@ -1542,6 +1937,17 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         return stable.empty() ? null : Assignments.fromBytes(stable.value());
     }
 
+    private @Nullable AssignmentsChain getAssignmentsChain(IgniteImpl node, 
int partId) {
+        CompletableFuture<Entry> chainFut = node.metaStorageManager()
+                .get(assignmentsChainKey(new TablePartitionId(tableId, 
partId)));
+
+        assertThat(chainFut, willCompleteSuccessfully());
+
+        Entry chain = chainFut.join();
+
+        return chain.empty() ? null : 
AssignmentsChain.fromBytes(chain.value());
+    }
+
     @Retention(RetentionPolicy.RUNTIME)
     @interface ZoneParams {
         int replicas();
@@ -1549,5 +1955,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         int partitions();
 
         int nodes() default INITIAL_NODES;
+
+        ConsistencyMode consistencyMode() default 
ConsistencyMode.STRONG_CONSISTENCY;
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 2bae27d71c..91beb10eec 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -33,6 +33,7 @@ import static 
org.apache.ignite.internal.causality.IncrementalVersionedValue.dep
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX_BYTES;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.assignmentsChainKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTablePartitionId;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignmentsGetLocally;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
@@ -97,6 +98,7 @@ import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
 import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
@@ -144,6 +146,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicat
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ChangePeersAndLearnersAsyncReplicaRequest;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
+import org.apache.ignite.internal.partitiondistribution.AssignmentsChain;
 import 
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
@@ -983,18 +986,30 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      */
     public CompletableFuture<List<Assignments>> 
writeTableAssignmentsToMetastore(
             int tableId,
+            ConsistencyMode consistencyMode,
             CompletableFuture<List<Assignments>> assignmentsFuture
     ) {
         return assignmentsFuture.thenCompose(newAssignments -> {
             assert !newAssignments.isEmpty();
 
+            boolean haMode = consistencyMode == 
ConsistencyMode.HIGH_AVAILABILITY;
+
             List<Operation> partitionAssignments = new 
ArrayList<>(newAssignments.size());
 
             for (int i = 0; i < newAssignments.size(); i++) {
-                ByteArray stableAssignmentsKey = stablePartAssignmentsKey(new 
TablePartitionId(tableId, i));
+                TablePartitionId tablePartitionId = new 
TablePartitionId(tableId, i);
+
+                ByteArray stableAssignmentsKey = 
stablePartAssignmentsKey(tablePartitionId);
                 byte[] anAssignment = newAssignments.get(i).toBytes();
                 Operation op = put(stableAssignmentsKey, anAssignment);
                 partitionAssignments.add(op);
+
+                if (haMode) {
+                    ByteArray assignmentsChainKey = 
assignmentsChainKey(tablePartitionId);
+                    byte[] assignmentChain = 
AssignmentsChain.of(newAssignments.get(i)).toBytes();
+                    Operation chainOp = put(assignmentsChainKey, 
assignmentChain);
+                    partitionAssignments.add(chainOp);
+                }
             }
 
             Condition condition = notExists(new 
ByteArray(toByteArray(partitionAssignments.get(0).key())));
@@ -1647,7 +1662,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                     tablePendingAssignmentsGetLocally(metaStorageMgr, tableId, 
zoneDescriptor.partitions(), causalityToken);
 
             CompletableFuture<List<Assignments>> 
stableAssignmentsFutureAfterInvoke =
-                    writeTableAssignmentsToMetastore(tableId, 
stableAssignmentsFuture);
+                    writeTableAssignmentsToMetastore(tableId, 
zoneDescriptor.consistencyMode(), stableAssignmentsFuture);
 
             Catalog catalog = catalogService.catalog(catalogVersion);
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 6db3ecdce5..7f692452ca 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.catalog.CatalogTestUtils;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
 import org.apache.ignite.internal.causality.RevisionListenerRegistry;
 import org.apache.ignite.internal.components.LogSyncer;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
@@ -404,7 +405,7 @@ public class TableManagerTest extends IgniteAbstractTest {
         var outerExceptionMsg = "Outer future is interrupted";
         assignmentsFuture.completeExceptionally(new 
TimeoutException(outerExceptionMsg));
         CompletableFuture<List<Assignments>> writtenAssignmentsFuture = 
tableManager
-                .writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
+                .writeTableAssignmentsToMetastore(tableId, 
ConsistencyMode.STRONG_CONSISTENCY, assignmentsFuture);
         assertTrue(writtenAssignmentsFuture.isCompletedExceptionally());
         assertThrowsWithCause(writtenAssignmentsFuture::get, 
TimeoutException.class, outerExceptionMsg);
 
@@ -414,7 +415,8 @@ public class TableManagerTest extends IgniteAbstractTest {
         var innerExceptionMsg = "Inner future is interrupted";
         invokeTimeoutFuture.completeExceptionally(new 
TimeoutException(innerExceptionMsg));
         when(msm.invoke(any(), anyList(), 
anyList())).thenReturn(invokeTimeoutFuture);
-        writtenAssignmentsFuture = 
tableManager.writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
+        writtenAssignmentsFuture =
+                tableManager.writeTableAssignmentsToMetastore(tableId, 
ConsistencyMode.STRONG_CONSISTENCY, assignmentsFuture);
         assertTrue(writtenAssignmentsFuture.isCompletedExceptionally());
         assertThrowsWithCause(writtenAssignmentsFuture::get, 
TimeoutException.class, innerExceptionMsg);
     }

Reply via email to