This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0280dc5446 IGNITE-23870 Implement a method to receive a chain of
topology changes (#4966)
0280dc5446 is described below
commit 0280dc5446fafd63e067d78c0901369f2d235a6f
Author: Cyrill <[email protected]>
AuthorDate: Mon Dec 30 12:35:21 2024 +0300
IGNITE-23870 Implement a method to receive a chain of topology changes
(#4966)
---
.../RebalanceRaftGroupEventsListener.java | 96 ++++-
.../distributionzones/rebalance/RebalanceUtil.java | 15 +
.../partitiondistribution/AssignmentsChain.java | 129 +++++++
.../AssignmentsChainSerializer.java | 55 +++
.../AssignmentsChainSerializerTest.java | 122 ++++++
.../ItDisasterRecoveryReconfigurationTest.java | 414 ++++++++++++++++++++-
.../internal/table/distributed/TableManager.java | 19 +-
.../table/distributed/TableManagerTest.java | 6 +-
8 files changed, 841 insertions(+), 15 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
index def5a4589d..e68c0b89f3 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.distributionzones.rebalance;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.assignmentsChainKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
@@ -50,10 +51,13 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Condition;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite.internal.metastorage.dsl.Update;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
+import org.apache.ignite.internal.partitiondistribution.AssignmentsChain;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftError;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
@@ -270,6 +274,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
ByteArray plannedPartAssignmentsKey =
plannedPartAssignmentsKey(tablePartitionId);
ByteArray switchReduceKey = switchReduceKey(tablePartitionId);
ByteArray switchAppendKey = switchAppendKey(tablePartitionId);
+ ByteArray assignmentsChainKey =
assignmentsChainKey(tablePartitionId);
// TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove
synchronous wait
Map<ByteArray, Entry> values = metaStorageMgr.getAll(
@@ -278,7 +283,8 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
pendingPartAssignmentsKey,
stablePartAssignmentsKey,
switchReduceKey,
- switchAppendKey
+ switchAppendKey,
+ assignmentsChainKey
)
).get();
@@ -287,6 +293,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
Entry plannedEntry = values.get(plannedPartAssignmentsKey);
Entry switchReduceEntry = values.get(switchReduceKey);
Entry switchAppendEntry = values.get(switchAppendKey);
+ Entry assignmentsChainEntry = values.get(assignmentsChainKey);
Set<Assignment> retrievedStable =
readAssignments(stableEntry).nodes();
Set<Assignment> retrievedSwitchReduce =
readAssignments(switchReduceEntry).nodes();
@@ -341,12 +348,21 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
// All conditions combined with AND operator.
Condition retryPreconditions = and(con1, and(con2, and(con3,
con4)));
+ long catalogTimestamp = pendingAssignments.timestamp();
+
+ Assignments newStableAssignments = Assignments.of(stableFromRaft,
catalogTimestamp);
+
+ Operation assignmentChainChangeOp = handleAssignmentsChainChange(
+ assignmentsChainKey,
+ assignmentsChainEntry,
+ pendingAssignments,
+ newStableAssignments
+ );
+
Update successCase;
Update failCase;
- long catalogTimestamp = pendingAssignments.timestamp();
-
- byte[] stableFromRaftByteArray =
Assignments.toBytes(stableFromRaft, catalogTimestamp);
+ byte[] stableFromRaftByteArray = newStableAssignments.toBytes();
byte[] additionByteArray =
Assignments.toBytes(calculatedPendingAddition, catalogTimestamp);
byte[] reductionByteArray =
Assignments.toBytes(calculatedPendingReduction, catalogTimestamp);
byte[] switchReduceByteArray =
Assignments.toBytes(calculatedSwitchReduce, catalogTimestamp);
@@ -357,7 +373,8 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
put(stablePartAssignmentsKey, stableFromRaftByteArray),
put(pendingPartAssignmentsKey, additionByteArray),
put(switchReduceKey, switchReduceByteArray),
- put(switchAppendKey, switchAppendByteArray)
+ put(switchAppendKey, switchAppendByteArray),
+ assignmentChainChangeOp
).yield(SWITCH_APPEND_SUCCESS);
failCase = ops().yield(SWITCH_APPEND_FAIL);
} else if (!calculatedSwitchReduce.isEmpty()) {
@@ -365,7 +382,8 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
put(stablePartAssignmentsKey, stableFromRaftByteArray),
put(pendingPartAssignmentsKey, reductionByteArray),
put(switchReduceKey, switchReduceByteArray),
- put(switchAppendKey, switchAppendByteArray)
+ put(switchAppendKey, switchAppendByteArray),
+ assignmentChainChangeOp
).yield(SWITCH_REDUCE_SUCCESS);
failCase = ops().yield(SWITCH_REDUCE_FAIL);
} else {
@@ -377,7 +395,8 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
successCase = ops(
put(stablePartAssignmentsKey,
stableFromRaftByteArray),
put(pendingPartAssignmentsKey,
plannedEntry.value()),
- remove(plannedPartAssignmentsKey)
+ remove(plannedPartAssignmentsKey),
+ assignmentChainChangeOp
).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);
failCase = ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
@@ -387,7 +406,8 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
successCase = ops(
put(stablePartAssignmentsKey,
stableFromRaftByteArray),
- remove(pendingPartAssignmentsKey)
+ remove(pendingPartAssignmentsKey),
+ assignmentChainChangeOp
).yield(FINISH_REBALANCE_SUCCESS);
failCase = ops().yield(FINISH_REBALANCE_FAIL);
@@ -469,6 +489,66 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
}
}
+ private static Operation handleAssignmentsChainChange(
+ ByteArray assignmentsChainKey,
+ Entry assignmentsChainEntry,
+ Assignments pendingAssignments,
+ Assignments stableAssignments
+ ) {
+ // We write this key only in HA mode. See
TableManager.writeTableAssignmentsToMetastore.
+ if (assignmentsChainEntry.value() != null) {
+ AssignmentsChain updatedAssignmentsChain = updateAssignmentsChain(
+ AssignmentsChain.fromBytes(assignmentsChainEntry.value()),
+ stableAssignments,
+ pendingAssignments
+ );
+ return put(assignmentsChainKey, updatedAssignmentsChain.toBytes());
+ } else {
+ return Operations.noop();
+ }
+ }
+
+ private static AssignmentsChain updateAssignmentsChain(AssignmentsChain
assignmentsChain, Assignments newStable,
+ Assignments pendingAssignments) {
+ assert assignmentsChain != null : "Assignments chain cannot be null in
HA mode.";
+
+ assert !assignmentsChain.chain().isEmpty() : "Assignments chain cannot
be empty on stable switch.";
+
+ /*
+ This method covers the following case:
+
+ stable = [A,B,C,D,E,F,G]
+ lost A B C D
+
+ stable = [E] pending = [E F G]
+ stable = [E F G]
+
+ E F lost
+ stable = [G]
+
+ on node G restart
+ ms.chain = [A,B,C,D,E,F,G] -> [E,F,G] -> [G]
+
+ on doStableKeySwitch
+ if !pending.force && !pending.secondPhaseOfReset
+ ms.chain = pending/stable // [A,B,C,D,E,F,G]
+ else if !pending.force && pending.secondPhaseOfReset
+ ms.chain.last = stable // [A,B,C,D,E,F,G] -> [E] =>
[A,B,C,D,E,F,G] -> [E F G]
+ else
+ ms.chain = ms.chain + pending/stable // [A,B,C,D,E,F,G] =>
[A,B,C,D,E,F,G] -> [E]
+ */
+ AssignmentsChain newAssignmentsChain;
+ if (!pendingAssignments.force() && !pendingAssignments.fromReset()) {
+ newAssignmentsChain = AssignmentsChain.of(newStable);
+ } else if (!pendingAssignments.force() &&
pendingAssignments.fromReset()) {
+ newAssignmentsChain = assignmentsChain.replaceLast(newStable);
+ } else {
+ newAssignmentsChain = assignmentsChain.addLast(newStable);
+ }
+
+ return newAssignmentsChain;
+ }
+
/**
* Creates a set of assignments from the given set of peers and learners.
*/
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 13b1646557..e90a6d0ef5 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -446,6 +446,10 @@ public class RebalanceUtil {
public static final byte[] PENDING_CHANGE_TRIGGER_PREFIX_BYTES =
PENDING_CHANGE_TRIGGER_PREFIX.getBytes(UTF_8);
+ public static final String ASSIGNMENTS_CHAIN_PREFIX = "assignments.chain.";
+
+ public static final byte[] ASSIGNMENTS_CHAIN_PREFIX_BYTES =
ASSIGNMENTS_CHAIN_PREFIX.getBytes(UTF_8);
+
/**
* Key that is needed for skipping stale events of pending key change.
*
@@ -490,6 +494,17 @@ public class RebalanceUtil {
return new ByteArray(STABLE_ASSIGNMENTS_PREFIX + partId);
}
+ /**
+ * Key for the graceful restart in HA mode.
+ *
+ * @param partId Unique identifier of a partition.
+ * @return Key for a partition.
+ * @see <a
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-131%3A+Partition+Majority+Unavailability+Handling">HA
mode</a>
+ */
+ public static ByteArray assignmentsChainKey(TablePartitionId partId) {
+ return new ByteArray(ASSIGNMENTS_CHAIN_PREFIX + partId);
+ }
+
/**
* Key that is needed for the rebalance algorithm.
*
diff --git
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChain.java
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChain.java
new file mode 100644
index 0000000000..b417c2f0b2
--- /dev/null
+++
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChain.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partitiondistribution;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.jetbrains.annotations.Contract;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains the chain of changed assignments.
+ */
+public class AssignmentsChain {
+ /** Chain of assignments. */
+ @IgniteToStringInclude
+ private final List<Assignments> chain;
+
+ private AssignmentsChain(List<Assignments> chain) {
+ this.chain = chain;
+ }
+
+ public List<Assignments> chain() {
+ return chain;
+ }
+
+ /**
+ * Create a new {@link AssignmentsChain} with the last link in the chain
replaced with the provided one.
+ *
+ * @param newLast New last link.
+ * @return new AssignmentsChain.
+ */
+ public AssignmentsChain replaceLast(Assignments newLast) {
+ assert !chain.isEmpty() : "Assignments chain is empty.";
+
+ List<Assignments> newChain = new ArrayList<>(chain);
+
+ newChain.set(newChain.size() - 1, newLast);
+
+ return new AssignmentsChain(newChain);
+ }
+
+ /**
+ * Create a new {@link AssignmentsChain} with a new link added to the
chain.
+ *
+ * @param newLast New last link.
+ * @return new AssignmentsChain.
+ */
+ public AssignmentsChain addLast(Assignments newLast) {
+ assert !chain.isEmpty() : "Assignments chain is empty.";
+
+ List<Assignments> newChain = new ArrayList<>(chain);
+
+ newChain.add(newLast);
+
+ return new AssignmentsChain(newChain);
+ }
+
+ /**
+ * Creates a new instance.
+ *
+ * @param assignments Partition assignments.
+ */
+ public static AssignmentsChain of(Assignments assignments) {
+ return new AssignmentsChain(List.of(assignments));
+ }
+
+ /**
+ * Creates a new instance.
+ *
+ * @param assignmentsChain Chain of partition assignments.
+ */
+ public static AssignmentsChain of(List<Assignments> assignmentsChain) {
+ return new AssignmentsChain(assignmentsChain);
+ }
+
+ public byte[] toBytes() {
+ return VersionedSerialization.toBytes(this,
AssignmentsChainSerializer.INSTANCE);
+ }
+
+ /**
+ * Deserializes assignments from the array of bytes. Returns {@code null}
if the argument is {@code null}.
+ */
+ @Nullable
+ @Contract("null -> null; !null -> !null")
+ public static AssignmentsChain fromBytes(byte @Nullable [] bytes) {
+ return bytes == null ? null : VersionedSerialization.fromBytes(bytes,
AssignmentsChainSerializer.INSTANCE);
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ AssignmentsChain that = (AssignmentsChain) o;
+
+ return Objects.equals(chain, that.chain);
+ }
+
+ @Override
+ public int hashCode() {
+ return chain.hashCode();
+ }
+
+}
diff --git
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChainSerializer.java
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChainSerializer.java
new file mode 100644
index 0000000000..9538ec2686
--- /dev/null
+++
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChainSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partitiondistribution;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+import org.apache.ignite.internal.versioned.VersionedSerializer;
+
+
+/**
+ * {@link VersionedSerializer} for {@link AssignmentsChain} instances.
+ */
+public class AssignmentsChainSerializer extends
VersionedSerializer<AssignmentsChain> {
+
+ /** Serializer instance. */
+ public static final AssignmentsChainSerializer INSTANCE = new
AssignmentsChainSerializer();
+
+ @Override
+ protected void writeExternalData(AssignmentsChain chain, IgniteDataOutput
out) throws IOException {
+ out.writeVarInt(chain.chain().size());
+
+ for (Assignments assignment : chain.chain()) {
+ AssignmentsSerializer.INSTANCE.writeExternal(assignment, out);
+ }
+ }
+
+ @Override
+ protected AssignmentsChain readExternalData(byte protoVer, IgniteDataInput
in) throws IOException {
+ int length = in.readVarIntAsInt();
+ List<Assignments> assignmentsChain = new ArrayList<>(length);
+
+ for (int i = 0; i < length; i++) {
+
assignmentsChain.add(AssignmentsSerializer.INSTANCE.readExternal(in));
+ }
+ return AssignmentsChain.of(assignmentsChain);
+ }
+}
diff --git
a/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChainSerializerTest.java
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChainSerializerTest.java
new file mode 100644
index 0000000000..9a102c9e13
--- /dev/null
+++
b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChainSerializerTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partitiondistribution;
+
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+import java.time.LocalDateTime;
+import java.time.Month;
+import java.time.ZoneOffset;
+import java.util.Base64;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.junit.jupiter.api.Test;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
+
+class AssignmentsChainSerializerTest {
+ private static final String ASSIGNMENTS_CHAIN_SERIALIZED_WITH_V1 =
"Ae++QwMB775DAwRhYmMBBGRlZgAAUcKMAQD0BgAB775DAgRkZWYAAFHCjAEA9AYA";
+
+ private final AssignmentsChainSerializer serializer = new
AssignmentsChainSerializer();
+
+ private static final long BASE_PHYSICAL_TIME = LocalDateTime.of(2024,
Month.JANUARY, 1, 0, 0)
+ .atOffset(ZoneOffset.UTC)
+ .toInstant()
+ .toEpochMilli();
+
+ private static long baseTimestamp(int logical) {
+ return new HybridTimestamp(BASE_PHYSICAL_TIME, logical).longValue();
+ }
+
+ @CartesianTest
+ void serializationAndDeserialization(
+ @Values(booleans = {false, true}) boolean force,
+ @Values(booleans = {false, true}) boolean fromReset
+ ) {
+ AssignmentsChain originalAssignmentsChain =
+ AssignmentsChain.of(List.of(testAssignments1(force,
fromReset), testAssignments2(force, fromReset)));
+
+ byte[] bytes =
VersionedSerialization.toBytes(originalAssignmentsChain, serializer);
+ AssignmentsChain restoredAssignmentsChain =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredAssignmentsChain,
equalTo(originalAssignmentsChain));
+ }
+
+ @Test
+ void v1CanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode(ASSIGNMENTS_CHAIN_SERIALIZED_WITH_V1);
+ AssignmentsChain restoredAssignmentsChain =
VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertChainFromV1(restoredAssignmentsChain);
+ }
+
+ private static void assertChainFromV1(AssignmentsChain restoredChain) {
+ assertThat(restoredChain.chain(), hasSize(2));
+ assertNodes1FromV1(restoredChain.chain().get(0));
+ assertNodes2FromV1(restoredChain.chain().get(1));
+ }
+
+ private static void assertNodes1FromV1(Assignments restoredAssignments) {
+ assertThat(restoredAssignments.nodes(), hasSize(2));
+ List<Assignment> orderedNodes = restoredAssignments.nodes().stream()
+ .sorted(comparing(Assignment::consistentId))
+ .collect(toList());
+
+ Assignment assignment1 = orderedNodes.get(0);
+ assertThat(assignment1.consistentId(), is("abc"));
+ assertThat(assignment1.isPeer(), is(true));
+
+ Assignment assignment2 = orderedNodes.get(1);
+ assertThat(assignment2.consistentId(), is("def"));
+ assertThat(assignment2.isPeer(), is(false));
+ }
+
+ private static void assertNodes2FromV1(Assignments restoredAssignments) {
+ assertThat(restoredAssignments.nodes(), hasSize(1));
+ List<Assignment> orderedNodes = restoredAssignments.nodes().stream()
+ .sorted(comparing(Assignment::consistentId))
+ .collect(toList());
+
+ Assignment assignment1 = orderedNodes.get(0);
+ assertThat(assignment1.consistentId(), is("def"));
+ assertThat(assignment1.isPeer(), is(false));
+ }
+
+ private static Assignments testAssignments1(boolean force, boolean
fromReset) {
+ Set<Assignment> nodes = Set.of(Assignment.forPeer("abc"),
Assignment.forLearner("def"));
+
+ return force
+ ? Assignments.forced(nodes, baseTimestamp(5))
+ : Assignments.of(nodes, baseTimestamp(5), fromReset);
+ }
+
+ private static Assignments testAssignments2(boolean force, boolean
fromReset) {
+ Set<Assignment> nodes = Set.of(Assignment.forLearner("def"));
+
+ return force
+ ? Assignments.forced(nodes, baseTimestamp(5))
+ : Assignments.of(nodes, baseTimestamp(5), fromReset);
+ }
+}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 75b4b23ae3..22f24c008e 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -28,6 +28,8 @@ import static
org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableManager;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.assignmentsChainKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
@@ -59,10 +61,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -72,6 +76,8 @@ import
org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.TestWrappers;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
+import
org.apache.ignite.internal.configuration.SystemDistributedExtensionConfiguration;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.RunnableX;
@@ -86,6 +92,7 @@ import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPar
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
+import org.apache.ignite.internal.partitiondistribution.AssignmentsChain;
import
org.apache.ignite.internal.partitiondistribution.RendezvousDistributionFunction;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.raft.Peer;
@@ -172,8 +179,10 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
// startNodesInParallel(IntStream.range(INITIAL_NODES,
zoneParams.nodes()).toArray());
executeSql(format("CREATE ZONE %s with replicas=%d, partitions=%d,"
- + " data_nodes_auto_adjust_scale_down=%d,
data_nodes_auto_adjust_scale_up=%d, storage_profiles='%s'",
- zoneName, zoneParams.replicas(), zoneParams.partitions(),
SCALE_DOWN_TIMEOUT_SECONDS, 1, DEFAULT_STORAGE_PROFILE
+ + " data_nodes_auto_adjust_scale_down=%d,
data_nodes_auto_adjust_scale_up=%d, storage_profiles='%s',"
+ + " consistency_mode='%s'",
+ zoneName, zoneParams.replicas(), zoneParams.partitions(),
SCALE_DOWN_TIMEOUT_SECONDS, 1, DEFAULT_STORAGE_PROFILE,
+ zoneParams.consistencyMode().name()
));
CatalogZoneDescriptor zone = node0.catalogManager().zone(zoneName,
node0.clock().nowLong());
@@ -957,6 +966,8 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
assertStableAssignments(node0, partId, allAssignments);
+ assertAssignmentsChain(node0, partId, null);
+
// Write data(1) to all seven nodes.
List<Throwable> errors = insertValues(table, partId, 0);
assertThat(errors, is(empty()));
@@ -1234,6 +1245,378 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
assertPlannedAssignments(node0, partId, assignments13);
}
+ @Test
+ @ZoneParams(nodes = 7, replicas = 3, partitions = 1, consistencyMode =
ConsistencyMode.HIGH_AVAILABILITY)
+ void testAssignmentsChainUpdate() throws Exception {
+ int partId = 0;
+
+ IgniteImpl node0 = igniteImpl(0);
+ int catalogVersion = node0.catalogManager().latestCatalogVersion();
+ long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+ Table table = node0.tables().table(TABLE_NAME);
+
+ awaitPrimaryReplica(node0, partId);
+
+ assertRealAssignments(node0, partId, 0, 2, 3);
+
+ Assignments initialAssignments = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(2).name()),
+ Assignment.forPeer(node(3).name())
+ ), timestamp);
+
+ assertStableAssignments(node0, partId, initialAssignments);
+
+ assertAssignmentsChain(node0, partId,
AssignmentsChain.of(initialAssignments));
+
+ // Write data(1) to all nodes.
+ List<Throwable> errors = insertValues(table, partId, 0);
+ assertThat(errors, is(empty()));
+
+ logger().info("Stopping nodes [ids={}].", 3);
+
+ stopNode(3);
+
+ logger().info("Stopped nodes [ids={}].", 3);
+
+ Assignments link2Assignments = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(2).name())
+ ), timestamp);
+
+ assertRealAssignments(node0, partId, 0, 1, 2);
+
+ assertStableAssignments(node0, partId, link2Assignments, 30_000);
+
+ // Graceful change should reinit the assignments chain, in other words
there should be only one link
+ // in the chain - the current stable assignments.
+ assertAssignmentsChain(node0, partId,
AssignmentsChain.of(List.of(link2Assignments)));
+
+ // Disable scale down to avoid unwanted rebalance.
+ executeSql(format("ALTER ZONE %s SET
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+ // Disable automatic rebalance since we want to restore replica factor.
+ setDistributionResetTimeout(node0, INFINITE_TIMER_VALUE);
+
+ // Now stop the majority and the automatic reset should kick in.
+ logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{1,
2}));
+
+ Assignments resetAssignments = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(4).name()),
+ Assignment.forPeer(node(5).name())
+ ), timestamp);
+
+ AtomicBoolean blockedLink = new AtomicBoolean(true);
+
+ // Block stable switch to check that we initially add reset phase 1
assignments to the chain.
+ blockMessage((nodeName, msg) -> blockedLink.get() &&
stableKeySwitchMessage(msg, partId, resetAssignments));
+
+ stopNodesInParallel(1, 2);
+
+ CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetAllPartitions(
+ zoneName,
+ QUALIFIED_TABLE_NAME,
+ true,
+ -1
+ );
+
+ assertThat(updateFuture, willCompleteSuccessfully());
+
+ Assignments linkFirstPhaseReset = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name())
+ ), timestamp);
+
+ assertStableAssignments(node0, partId, linkFirstPhaseReset, 60_000);
+
+ // Assignments chain consists of stable and the first phase of reset.
+ assertAssignmentsChain(node0, partId,
AssignmentsChain.of(List.of(link2Assignments, linkFirstPhaseReset)));
+
+ // Unblock stable switch, wait for reset phase 2 assignments to
replace phase 1 assignments in the chain.
+ blockedLink.set(false);
+
+ logger().info("Unblocked stable switch.");
+
+ assertRealAssignments(node0, partId, 0, 4, 5);
+
+ assertStableAssignments(node0, partId, resetAssignments, 60_000);
+
+ // Assignments chain consists of stable and the second phase of reset.
+ assertAssignmentsChain(node0, partId,
AssignmentsChain.of(List.of(link2Assignments, resetAssignments)));
+ }
+
+ @Test
+ @ZoneParams(nodes = 7, replicas = 7, partitions = 1, consistencyMode =
ConsistencyMode.HIGH_AVAILABILITY)
+ void testAssignmentsChainUpdatedOnAutomaticReset() throws Exception {
+ int partId = 0;
+
+ IgniteImpl node0 = igniteImpl(0);
+ int catalogVersion = node0.catalogManager().latestCatalogVersion();
+ long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+ Table table = node0.tables().table(TABLE_NAME);
+
+ awaitPrimaryReplica(node0, partId);
+
+ // Disable scale down to avoid unwanted rebalance.
+ executeSql(format("ALTER ZONE %s SET
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+ assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
+
+ Assignments allAssignments = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(2).name()),
+ Assignment.forPeer(node(3).name()),
+ Assignment.forPeer(node(4).name()),
+ Assignment.forPeer(node(5).name()),
+ Assignment.forPeer(node(6).name())
+ ), timestamp);
+
+ assertStableAssignments(node0, partId, allAssignments);
+
+ // Assignments chain is equal to the stable assignments.
+ assertAssignmentsChain(node0, partId,
AssignmentsChain.of(allAssignments));
+
+ // Write data(1) to all nodes.
+ List<Throwable> errors = insertValues(table, partId, 0);
+ assertThat(errors, is(empty()));
+
+ Assignments link2Assignments = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(2).name())
+ ), timestamp);
+
+ AtomicBoolean blockedLink2 = new AtomicBoolean(true);
+
+ // Block stable switch to check that we initially add reset phase 1
assignments to the chain.
+ blockMessage((nodeName, msg) -> blockedLink2.get() &&
stableKeySwitchMessage(msg, partId, link2Assignments));
+
+ logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{3,
4, 5, 6}));
+
+ stopNodesInParallel(3, 4, 5, 6);
+
+ Assignments link2FirstPhaseReset = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name())
+ ), timestamp);
+
+ assertStableAssignments(node0, partId, link2FirstPhaseReset, 60_000);
+
+ // Assignments chain consists of stable and the first phase of reset.
+ assertAssignmentsChain(node0, partId,
AssignmentsChain.of(List.of(allAssignments, link2FirstPhaseReset)));
+
+ // Unblock stable switch, wait for reset phase 2 assignments to
replace phase 1 assignments in the chain.
+ blockedLink2.set(false);
+
+ assertStableAssignments(node0, partId, link2Assignments, 30_000);
+
+ // Assignments chain consists of stable and the second phase of reset.
+ assertAssignmentsChain(node0, partId,
AssignmentsChain.of(List.of(allAssignments, link2Assignments)));
+
+ logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{1,
2}));
+ stopNodesInParallel(1, 2);
+
+ Assignments link3Assignments = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name())
+ ), timestamp);
+
+ assertStableAssignments(node0, partId, link3Assignments, 30_000);
+
+ assertAssignmentsChain(node0, partId,
AssignmentsChain.of(List.of(allAssignments, link2Assignments,
link3Assignments)));
+ }
+
+ @Test
+ @ZoneParams(nodes = 7, replicas = 7, partitions = 1, consistencyMode =
ConsistencyMode.HIGH_AVAILABILITY)
+ void testSecondResetRewritesUnfinishedFirstPhaseReset() throws Exception {
+ int partId = 0;
+
+ IgniteImpl node0 = igniteImpl(0);
+ int catalogVersion = node0.catalogManager().latestCatalogVersion();
+ long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+ Table table = node0.tables().table(TABLE_NAME);
+
+ awaitPrimaryReplica(node0, partId);
+
+ // Disable automatic reset since we want to check manual ones.
+ setDistributionResetTimeout(node0, INFINITE_TIMER_VALUE);
+ // Disable scale down to avoid unwanted rebalance.
+ executeSql(format("ALTER ZONE %s SET
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+ assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
+
+ Assignments allAssignments = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(2).name()),
+ Assignment.forPeer(node(3).name()),
+ Assignment.forPeer(node(4).name()),
+ Assignment.forPeer(node(5).name()),
+ Assignment.forPeer(node(6).name())
+ ), timestamp);
+
+ assertStableAssignments(node0, partId, allAssignments);
+
+ assertAssignmentsChain(node0, partId,
AssignmentsChain.of(allAssignments));
+
+ // Write data(1) to all seven nodes.
+ List<Throwable> errors = insertValues(table, partId, 0);
+ assertThat(errors, is(empty()));
+
+ Assignments blockedRebalance = Assignments.of(timestamp,
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(2).name())
+ );
+
+ blockRebalanceStableSwitch(partId, blockedRebalance);
+
+ logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{3,
4, 5, 6}));
+ stopNodesInParallel(3, 4, 5, 6);
+
+ CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetAllPartitions(
+ zoneName,
+ QUALIFIED_TABLE_NAME,
+ true,
+ -1
+ );
+
+ assertThat(updateFuture, willCompleteSuccessfully());
+
+ // First phase of reset. The second phase stable switch is blocked.
+ Assignments link2Assignments = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name())
+ ), timestamp);
+
+ assertStableAssignments(node0, partId, link2Assignments, 30_000);
+
+ assertAssignmentsChain(node0, partId,
AssignmentsChain.of(List.of(allAssignments, link2Assignments)));
+
+ Assignments assignmentsPending = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(2).name())
+ ), timestamp, true);
+
+ assertPendingAssignments(node0, partId, assignmentsPending);
+
+ logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{
2}));
+ stopNode(2);
+
+ CompletableFuture<?> updateFuture2 =
node0.disasterRecoveryManager().resetAllPartitions(
+ zoneName,
+ QUALIFIED_TABLE_NAME,
+ true,
+ -1
+ );
+
+ assertThat(updateFuture2, willCompleteSuccessfully());
+
+ Assignments link3Assignments = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name())
+ ), timestamp);
+
+ assertStableAssignments(node0, partId, link3Assignments, 30_000);
+
+ assertAssignmentsChain(node0, partId,
AssignmentsChain.of(List.of(allAssignments, link2Assignments,
link3Assignments)));
+ }
+
+ @Test
+ @ZoneParams(nodes = 6, replicas = 3, partitions = 1, consistencyMode =
ConsistencyMode.HIGH_AVAILABILITY)
+ void testGracefulRewritesChainAfterForceReset() throws Exception {
+ int partId = 0;
+
+ IgniteImpl node0 = igniteImpl(0);
+ int catalogVersion = node0.catalogManager().latestCatalogVersion();
+ long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+ Table table = node0.tables().table(TABLE_NAME);
+
+ awaitPrimaryReplica(node0, partId);
+
+ // Disable automatic reset since we want to check manual ones.
+ setDistributionResetTimeout(node0, INFINITE_TIMER_VALUE);
+ // Disable scale down to avoid unwanted rebalance.
+ executeSql(format("ALTER ZONE %s SET
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+ assertRealAssignments(node0, partId, 2, 3, 5);
+
+ Assignments initialAssignments = Assignments.of(Set.of(
+ Assignment.forPeer(node(2).name()),
+ Assignment.forPeer(node(3).name()),
+ Assignment.forPeer(node(5).name())
+ ), timestamp);
+
+ assertStableAssignments(node0, partId, initialAssignments);
+
+ assertAssignmentsChain(node0, partId,
AssignmentsChain.of(initialAssignments));
+
+ // Write data(1) to all nodes.
+ List<Throwable> errors = insertValues(table, partId, 0);
+ assertThat(errors, is(empty()));
+
+ logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{2,
3}));
+
+ stopNodesInParallel(2, 3);
+
+ logger().info("Stopped nodes [ids={}].", Arrays.toString(new int[]{2,
3}));
+
+ CompletableFuture<?> updateFuture2 =
node0.disasterRecoveryManager().resetAllPartitions(
+ zoneName,
+ QUALIFIED_TABLE_NAME,
+ true,
+ -1
+ );
+
+ assertThat(updateFuture2, willCompleteSuccessfully());
+
+ Assignments link2Assignments = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(5).name())
+ ), timestamp);
+
+ assertRealAssignments(node0, partId, 0, 1, 5);
+
+ assertStableAssignments(node0, partId, link2Assignments, 30_000);
+
+ assertAssignmentsChain(node0, partId,
AssignmentsChain.of(List.of(initialAssignments, link2Assignments)));
+
+ // Return back scale down.
+ executeSql(format("ALTER ZONE %s SET
data_nodes_auto_adjust_scale_down=%d", zoneName, 1));
+
+ // Now stop one and check graceful rebalance.
+ logger().info("Stopping nodes [ids={}].", 1);
+
+ stopNode(1);
+
+ Assignments finalAssignments = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(4).name()),
+ Assignment.forPeer(node(5).name())
+ ), timestamp);
+
+ assertRealAssignments(node0, partId, 0, 4, 5);
+
+ assertStableAssignments(node0, partId, finalAssignments, 30_000);
+
+ // Graceful change should reinit the assignments chain, in other words
there should be only one link
+ // in the chain - the current stable assignments.
+ assertAssignmentsChain(node0, partId,
AssignmentsChain.of(List.of(finalAssignments)));
+ }
+
+ private void setDistributionResetTimeout(IgniteImpl node, long timeout) {
+ CompletableFuture<Void> changeFuture = node
+ .clusterConfiguration()
+ .getConfiguration(SystemDistributedExtensionConfiguration.KEY)
+ .system().change(c0 -> c0.changeProperties()
+ .createOrUpdate(PARTITION_DISTRIBUTION_RESET_TIMEOUT,
+ c1 ->
c1.changePropertyValue(String.valueOf(timeout)))
+ );
+
+ assertThat(changeFuture, willCompleteSuccessfully());
+ }
+
private static String getPendingNodeName(List<String> aliveNodes, String
blockedNode) {
List<String> candidates = new ArrayList<>(aliveNodes);
candidates.remove(blockedNode);
@@ -1403,12 +1786,24 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
}
private void assertStableAssignments(IgniteImpl node0, int partId,
Assignments expected) throws InterruptedException {
+ assertStableAssignments(node0, partId, expected, 2000);
+ }
+
+ private void assertStableAssignments(IgniteImpl node0, int partId,
Assignments expected, long timeoutMillis)
+ throws InterruptedException {
assertTrue(
- waitForCondition(() ->
expected.equals(getStableAssignments(node0, partId)), 2000),
+ waitForCondition(() ->
expected.equals(getStableAssignments(node0, partId)), timeoutMillis),
() -> "Expected: " + expected + ", actual: " +
getStableAssignments(node0, partId)
);
}
+ private void assertAssignmentsChain(IgniteImpl node0, int partId,
@Nullable AssignmentsChain expected) throws InterruptedException {
+ assertTrue(
+ waitForCondition(() -> Objects.equals(expected,
getAssignmentsChain(node0, partId)), 2000),
+ () -> "Expected: " + expected + ", actual: " +
getAssignmentsChain(node0, partId)
+ );
+ }
+
/**
* Inserts {@value ENTRIES} values into a table, expecting either a
success or specific set of exceptions that would indicate
* replication issues. Collects such exceptions into a list and returns.
Fails if unexpected exception happened.
@@ -1542,6 +1937,17 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
return stable.empty() ? null : Assignments.fromBytes(stable.value());
}
+ private @Nullable AssignmentsChain getAssignmentsChain(IgniteImpl node,
int partId) {
+ CompletableFuture<Entry> chainFut = node.metaStorageManager()
+ .get(assignmentsChainKey(new TablePartitionId(tableId,
partId)));
+
+ assertThat(chainFut, willCompleteSuccessfully());
+
+ Entry chain = chainFut.join();
+
+ return chain.empty() ? null :
AssignmentsChain.fromBytes(chain.value());
+ }
+
@Retention(RetentionPolicy.RUNTIME)
@interface ZoneParams {
int replicas();
@@ -1549,5 +1955,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
int partitions();
int nodes() default INITIAL_NODES;
+
+ ConsistencyMode consistencyMode() default
ConsistencyMode.STRONG_CONSISTENCY;
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 2bae27d71c..91beb10eec 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -33,6 +33,7 @@ import static
org.apache.ignite.internal.causality.IncrementalVersionedValue.dep
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX_BYTES;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.assignmentsChainKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTablePartitionId;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignmentsGetLocally;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
@@ -97,6 +98,7 @@ import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
@@ -144,6 +146,7 @@ import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicat
import
org.apache.ignite.internal.partition.replicator.network.replication.ChangePeersAndLearnersAsyncReplicaRequest;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
+import org.apache.ignite.internal.partitiondistribution.AssignmentsChain;
import
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
@@ -983,18 +986,30 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
*/
public CompletableFuture<List<Assignments>>
writeTableAssignmentsToMetastore(
int tableId,
+ ConsistencyMode consistencyMode,
CompletableFuture<List<Assignments>> assignmentsFuture
) {
return assignmentsFuture.thenCompose(newAssignments -> {
assert !newAssignments.isEmpty();
+ boolean haMode = consistencyMode ==
ConsistencyMode.HIGH_AVAILABILITY;
+
List<Operation> partitionAssignments = new
ArrayList<>(newAssignments.size());
for (int i = 0; i < newAssignments.size(); i++) {
- ByteArray stableAssignmentsKey = stablePartAssignmentsKey(new
TablePartitionId(tableId, i));
+ TablePartitionId tablePartitionId = new
TablePartitionId(tableId, i);
+
+ ByteArray stableAssignmentsKey =
stablePartAssignmentsKey(tablePartitionId);
byte[] anAssignment = newAssignments.get(i).toBytes();
Operation op = put(stableAssignmentsKey, anAssignment);
partitionAssignments.add(op);
+
+ if (haMode) {
+ ByteArray assignmentsChainKey =
assignmentsChainKey(tablePartitionId);
+ byte[] assignmentChain =
AssignmentsChain.of(newAssignments.get(i)).toBytes();
+ Operation chainOp = put(assignmentsChainKey,
assignmentChain);
+ partitionAssignments.add(chainOp);
+ }
}
Condition condition = notExists(new
ByteArray(toByteArray(partitionAssignments.get(0).key())));
@@ -1647,7 +1662,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
tablePendingAssignmentsGetLocally(metaStorageMgr, tableId,
zoneDescriptor.partitions(), causalityToken);
CompletableFuture<List<Assignments>>
stableAssignmentsFutureAfterInvoke =
- writeTableAssignmentsToMetastore(tableId,
stableAssignmentsFuture);
+ writeTableAssignmentsToMetastore(tableId,
zoneDescriptor.consistencyMode(), stableAssignmentsFuture);
Catalog catalog = catalogService.catalog(catalogVersion);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 6db3ecdce5..7f692452ca 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.catalog.CatalogTestUtils;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite.internal.causality.RevisionListenerRegistry;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
@@ -404,7 +405,7 @@ public class TableManagerTest extends IgniteAbstractTest {
var outerExceptionMsg = "Outer future is interrupted";
assignmentsFuture.completeExceptionally(new
TimeoutException(outerExceptionMsg));
CompletableFuture<List<Assignments>> writtenAssignmentsFuture =
tableManager
- .writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
+ .writeTableAssignmentsToMetastore(tableId,
ConsistencyMode.STRONG_CONSISTENCY, assignmentsFuture);
assertTrue(writtenAssignmentsFuture.isCompletedExceptionally());
assertThrowsWithCause(writtenAssignmentsFuture::get,
TimeoutException.class, outerExceptionMsg);
@@ -414,7 +415,8 @@ public class TableManagerTest extends IgniteAbstractTest {
var innerExceptionMsg = "Inner future is interrupted";
invokeTimeoutFuture.completeExceptionally(new
TimeoutException(innerExceptionMsg));
when(msm.invoke(any(), anyList(),
anyList())).thenReturn(invokeTimeoutFuture);
- writtenAssignmentsFuture =
tableManager.writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
+ writtenAssignmentsFuture =
+ tableManager.writeTableAssignmentsToMetastore(tableId,
ConsistencyMode.STRONG_CONSISTENCY, assignmentsFuture);
assertTrue(writtenAssignmentsFuture.isCompletedExceptionally());
assertThrowsWithCause(writtenAssignmentsFuture::get,
TimeoutException.class, innerExceptionMsg);
}