This is an automated email from the ASF dual-hosted git repository.
ashapkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 98ab469d838 IGNITE-24513 improved automatic reset logic (#7580)
98ab469d838 is described below
commit 98ab469d8385edc22bf8a8c3375ad855339322a0
Author: Egor <[email protected]>
AuthorDate: Tue Feb 24 16:50:53 2026 +0400
IGNITE-24513 improved automatic reset logic (#7580)
Fixed automatic reset logic to prevent rebalance overwrites during filter
changes or node restarts.
Now we verify that there is no pending assignment with all alive nodes and
with a revision higher than the recovery trigger revision for the current group.
Co-authored-by: Egor Kuts <[email protected]>
---
...ilablePartitionsRecoveryByFilterUpdateTest.java | 60 ++++++++++++++++------
.../disaster/DisasterRecoveryManager.java | 2 +-
.../disaster/GroupUpdateRequestHandler.java | 33 +++++++++++-
.../ItDisasterRecoveryReconfigurationTest.java | 8 +--
4 files changed, 80 insertions(+), 23 deletions(-)
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
index 8470b920920..419702554dd 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
@@ -33,6 +33,7 @@ import
org.apache.ignite.internal.catalog.commands.AlterZoneCommand;
import org.apache.ignite.internal.catalog.commands.AlterZoneCommandBuilder;
import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.failure.handlers.configuration.StopNodeFailureHandlerConfigurationSchema;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.table.Table;
import org.intellij.lang.annotations.Language;
@@ -43,19 +44,19 @@ import org.junit.jupiter.api.Test;
/** Test suite for the cases with a recovery of the group replication factor
after reset by zone filter update. */
public class ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends
AbstractHighAvailablePartitionsRecoveryTest {
private static final String GLOBAL_EU_NODES_CONFIG =
- nodeConfig("{region = EU, zone = global}",
"{segmented_aipersist.engine = aipersist}");
+ nodeConfigWithFailureHandler("{region = EU, zone = global}",
"{segmented_aipersist.engine = aipersist}");
- private static final String EU_ONLY_NODES_CONFIG = nodeConfig("{region =
EU}", null);
+ private static final String EU_ONLY_NODES_CONFIG =
nodeConfigWithFailureHandler("{region = EU}", null);
- private static final String US_ONLY_NODES_CONFIG = nodeConfig("{region =
US}", null);
+ private static final String US_ONLY_NODES_CONFIG =
nodeConfigWithFailureHandler("{region = US}", null);
- private static final String GLOBAL_NODES_CONFIG = nodeConfig("{zone =
global}", null);
+ private static final String GLOBAL_NODES_CONFIG =
nodeConfigWithFailureHandler("{zone = global}", null);
- private static final String CUSTOM_NODES_CONFIG = nodeConfig("{zone =
custom}", null);
+ private static final String CUSTOM_NODES_CONFIG =
nodeConfigWithFailureHandler("{zone = custom}", null);
- private static final String ROCKS_NODES_CONFIG = nodeConfig(null,
"{lru_rocks.engine = rocksdb}");
+ private static final String ROCKS_NODES_CONFIG =
nodeConfigWithFailureHandler(null, "{lru_rocks.engine = rocksdb}");
- private static final String AIPERSIST_NODES_CONFIG = nodeConfig(null,
"{segmented_aipersist.engine = aipersist}");
+ private static final String AIPERSIST_NODES_CONFIG =
nodeConfigWithFailureHandler(null, "{segmented_aipersist.engine = aipersist}");
@Override
protected int initialNodes() {
@@ -67,6 +68,39 @@ public class
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
return GLOBAL_EU_NODES_CONFIG;
}
+ private static String nodeConfigWithFailureHandler(
+ @Nullable String nodeAttributes,
+ @Nullable String storageProfiles
+ ) {
+ return "ignite {\n"
+ + " nodeAttributes.nodeAttributes: " + nodeAttributes + ",\n"
+ + (storageProfiles == null ? "" : " storage.profiles: " +
storageProfiles + ",\n")
+ + " network: {\n"
+ + " port: {},\n"
+ + " nodeFinder: {\n"
+ + " netClusterNodes: [ {} ]\n"
+ + " },\n"
+ + " membership: {\n"
+ + " membershipSyncIntervalMillis: 1000,\n"
+ + " failurePingIntervalMillis: 500,\n"
+ + " scaleCube: {\n"
+ + " membershipSuspicionMultiplier: 1,\n"
+ + " failurePingRequestMembers: 1,\n"
+ + " gossipIntervalMillis: 10\n"
+ + " },\n"
+ + " }\n"
+ + " },\n"
+ + " clientConnector: { port:{} }, \n"
+ + " rest.port: {},\n"
+ + " failureHandler: {\n"
+ + " handler: {\n"
+ + " type: \"" +
StopNodeFailureHandlerConfigurationSchema.TYPE + "\"\n"
+ + " },\n"
+ + " dumpThreadsOnFailure: false\n"
+ + " }\n"
+ + "}";
+ }
+
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-25285")
void testScaleUpAfterZoneFilterUpdate() throws InterruptedException {
@@ -164,7 +198,6 @@ public class
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
* @throws Exception If failed.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-24513")
void testSeveralHaResetsAndSomeNodeRestart() throws Exception {
for (int i = 1; i < 8; i++) {
startNode(i, CUSTOM_NODES_CONFIG);
@@ -195,16 +228,9 @@ public class
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
// Stop the last node (G)
stopNode(1);
- // Start one node from phase 1 (A)
- startNode(4);
-
- // Start one node from phase 3 (G)
- startNode(1);
-
- // Start one node from phase 2 (E)
- startNode(2);
+ startNodes(4, 1, 2);
- waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME,
PARTITION_IDS, nodeNames(1, 2, 4));
+
waitThatAllRebalancesHaveFinishedAndStableAssignmentsEqualsToExpected(node0,
HA_TABLE_NAME, PARTITION_IDS, nodeNames(1, 2, 4));
// Verify that no data is lost and reads from partition on nodes A and
E are consistent with node G
assertValuesPresentOnNodes(node0.clock().now(), table, 1, 2, 4);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 70b66098462..d7ab9cf511f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -1345,7 +1345,7 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
return zoneDescriptor;
}
- private static ByteArray zoneRecoveryTriggerRevisionKey(int zoneId) {
+ static ByteArray zoneRecoveryTriggerRevisionKey(int zoneId) {
return new ByteArray(RECOVERY_TRIGGER_REVISION_KEY_PREFIX + zoneId);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
index 90f54e63bcd..42aec0b81bf 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
@@ -37,6 +37,7 @@ import static
org.apache.ignite.internal.partition.replicator.network.disaster.L
import static
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum.HEALTHY;
import static
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
import static
org.apache.ignite.internal.partitiondistribution.PendingAssignmentsCalculator.pendingAssignmentsCalculator;
+import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.zoneRecoveryTriggerRevisionKey;
import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.zoneState;
import static
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -75,6 +76,7 @@ import
org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
import org.apache.ignite.internal.replicator.PartitionGroupId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
+import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CollectionUtils;
import org.jetbrains.annotations.Nullable;
@@ -306,7 +308,6 @@ class GroupUpdateRequestHandler {
if (manualUpdate) {
enrichAssignments(partId, aliveDataNodes, partitions,
replicas, consensusGroupSize, partAssignments);
}
-
// We need to recalculate assignments to ensure that we have a
valid set of nodes with correct roles (peers/learners).
partAssignments = calculateAssignmentForPartition(
partAssignments.stream().map(Assignment::consistentId).collect(toSet()),
@@ -339,6 +340,15 @@ class GroupUpdateRequestHandler {
if (entry != null) {
AssignmentsQueue pendingQueue =
AssignmentsQueue.fromBytes(entry.value());
if (pendingQueue != null && !pendingQueue.isEmpty()) {
+ ByteArray recoveryTriggerRevisionKey =
zoneRecoveryTriggerRevisionKey(partId.zoneId());
+ var recoveryTriggerRevisionEntry =
metaStorageMgr.getLocally(recoveryTriggerRevisionKey, revision);
+ long reductionRevision = (recoveryTriggerRevisionEntry
!= null && recoveryTriggerRevisionEntry.value() != null)
+ ?
ByteUtils.bytesToLongKeepingOrder(recoveryTriggerRevisionEntry.value())
+ : -1L;
+ if (entry.revision() > reductionRevision
+ &&
pendingQueueIsViableForRecovery(pendingQueue, aliveNodesConsistentIds,
replicas)) {
+ return
completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
+ }
AssignmentsQueue filteredPendingQueue =
filterAliveNodesOnly(pendingQueue, aliveNodesConsistentIds);
assignmentsQueue = new
AssignmentsQueue(assignmentsQueue, filteredPendingQueue);
}
@@ -357,6 +367,27 @@ class GroupUpdateRequestHandler {
});
}
+ private static boolean pendingQueueIsViableForRecovery(AssignmentsQueue
queue, Set<String> aliveNodesConsistentIds, int replicas) {
+ // Lets assume we have nodes A, B, C, D, E.
+ // C, D, E restart.
+ // Reset timeout triggers.
+ // Node C, D, E get back online to logical topology and create
pending=[A, B, C, D, E].
+ // Reset proceeds and sees A, B, C, D, E online at its revision.
+ // Then it overwrites existing pending from above with pending=[A].
planned=[A,B].
+ // To prevent this scenario we should skip such reset.
+
+ for (Assignments assignments : queue) {
+ if (assignments
+ .nodes()
+ .stream()
+ .map(Assignment::consistentId).anyMatch(name ->
!aliveNodesConsistentIds.contains(name))
+ ) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private static AssignmentsQueue filterAliveNodesOnly(AssignmentsQueue
queue, Set<String> aliveNodesConsistentIds) {
List<Assignments> filteredAssignments = new ArrayList<>();
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 9d2d295e3d8..68567108c5b 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -811,7 +811,7 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
waitForScale(node0, 2);
DisasterRecoveryManager disasterRecoveryManager =
node0.disasterRecoveryManager();
- CompletableFuture<?> updateFuture =
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1);
+ CompletableFuture<?> updateFuture =
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1000);
assertThat(updateFuture, willSucceedIn(60, SECONDS));
@@ -859,7 +859,7 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
stopNodesInParallel(1);
DisasterRecoveryManager disasterRecoveryManager =
node0.disasterRecoveryManager();
- CompletableFuture<?> updateFuture =
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1);
+ CompletableFuture<?> updateFuture =
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1000);
assertThat(updateFuture, willCompleteSuccessfully());
@@ -932,7 +932,7 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
blockRebalanceStableSwitch(partId, assignmentToBlock);
DisasterRecoveryManager disasterRecoveryManager =
node0.disasterRecoveryManager();
- CompletableFuture<Void> resetFuture =
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1);
+ CompletableFuture<Void> resetFuture =
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1000);
assertThat(resetFuture, willCompleteSuccessfully());
// force == true, fromReset == false.
@@ -1215,7 +1215,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
blockRebalanceStableSwitch(partId, assignmentsToBlock);
DisasterRecoveryManager disasterRecoveryManager =
node0.disasterRecoveryManager();
- CompletableFuture<?> updateFuture =
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1);
+ CompletableFuture<?> updateFuture =
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1000);
assertThat(updateFuture, willCompleteSuccessfully());
// It's important to unblock appendEntries requests after
resetPartitions, otherwise 2 or even all 3 nodes may align by data/index