This is an automated email from the ASF dual-hosted git repository.

ashapkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 98ab469d838 IGNITE-24513 improved automatic reset logic (#7580)
98ab469d838 is described below

commit 98ab469d8385edc22bf8a8c3375ad855339322a0
Author: Egor <[email protected]>
AuthorDate: Tue Feb 24 16:50:53 2026 +0400

    IGNITE-24513 improved automatic reset logic (#7580)
    
    Fixed automatic reset logic to prevent rebalance overwrites during filter 
changes or node restarts.
    Now we verify that there is no pending assignment with all alive nodes and 
with a revision higher than the recovery trigger revision for the current group.
    
    Co-authored-by: Egor Kuts <[email protected]>
---
 ...ilablePartitionsRecoveryByFilterUpdateTest.java | 60 ++++++++++++++++------
 .../disaster/DisasterRecoveryManager.java          |  2 +-
 .../disaster/GroupUpdateRequestHandler.java        | 33 +++++++++++-
 .../ItDisasterRecoveryReconfigurationTest.java     |  8 +--
 4 files changed, 80 insertions(+), 23 deletions(-)

diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
index 8470b920920..419702554dd 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
@@ -33,6 +33,7 @@ import 
org.apache.ignite.internal.catalog.commands.AlterZoneCommand;
 import org.apache.ignite.internal.catalog.commands.AlterZoneCommandBuilder;
 import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.failure.handlers.configuration.StopNodeFailureHandlerConfigurationSchema;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.table.Table;
 import org.intellij.lang.annotations.Language;
@@ -43,19 +44,19 @@ import org.junit.jupiter.api.Test;
 /** Test suite for the cases with a recovery of the group replication factor 
after reset by zone filter update. */
 public class ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends 
AbstractHighAvailablePartitionsRecoveryTest {
     private static final String GLOBAL_EU_NODES_CONFIG =
-            nodeConfig("{region = EU, zone = global}", 
"{segmented_aipersist.engine = aipersist}");
+            nodeConfigWithFailureHandler("{region = EU, zone = global}", 
"{segmented_aipersist.engine = aipersist}");
 
-    private static final String EU_ONLY_NODES_CONFIG = nodeConfig("{region = 
EU}", null);
+    private static final String EU_ONLY_NODES_CONFIG = 
nodeConfigWithFailureHandler("{region = EU}", null);
 
-    private static final String US_ONLY_NODES_CONFIG = nodeConfig("{region = 
US}", null);
+    private static final String US_ONLY_NODES_CONFIG = 
nodeConfigWithFailureHandler("{region = US}", null);
 
-    private static final String GLOBAL_NODES_CONFIG = nodeConfig("{zone = 
global}", null);
+    private static final String GLOBAL_NODES_CONFIG = 
nodeConfigWithFailureHandler("{zone = global}", null);
 
-    private static final String CUSTOM_NODES_CONFIG = nodeConfig("{zone = 
custom}", null);
+    private static final String CUSTOM_NODES_CONFIG = 
nodeConfigWithFailureHandler("{zone = custom}", null);
 
-    private static final String ROCKS_NODES_CONFIG = nodeConfig(null, 
"{lru_rocks.engine = rocksdb}");
+    private static final String ROCKS_NODES_CONFIG = 
nodeConfigWithFailureHandler(null, "{lru_rocks.engine = rocksdb}");
 
-    private static final String AIPERSIST_NODES_CONFIG = nodeConfig(null, 
"{segmented_aipersist.engine = aipersist}");
+    private static final String AIPERSIST_NODES_CONFIG = 
nodeConfigWithFailureHandler(null, "{segmented_aipersist.engine = aipersist}");
 
     @Override
     protected int initialNodes() {
@@ -67,6 +68,39 @@ public class 
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
         return GLOBAL_EU_NODES_CONFIG;
     }
 
+    private static String nodeConfigWithFailureHandler(
+            @Nullable String nodeAttributes,
+            @Nullable String storageProfiles
+    ) {
+        return "ignite {\n"
+                + "  nodeAttributes.nodeAttributes: " + nodeAttributes + ",\n"
+                + (storageProfiles == null ? "" : "  storage.profiles: " + 
storageProfiles + ",\n")
+                + "  network: {\n"
+                + "    port: {},\n"
+                + "    nodeFinder: {\n"
+                + "      netClusterNodes: [ {} ]\n"
+                + "    },\n"
+                + "    membership: {\n"
+                + "      membershipSyncIntervalMillis: 1000,\n"
+                + "      failurePingIntervalMillis: 500,\n"
+                + "      scaleCube: {\n"
+                + "        membershipSuspicionMultiplier: 1,\n"
+                + "        failurePingRequestMembers: 1,\n"
+                + "        gossipIntervalMillis: 10\n"
+                + "      },\n"
+                + "    }\n"
+                + "  },\n"
+                + "  clientConnector: { port:{} }, \n"
+                + "  rest.port: {},\n"
+                + "  failureHandler: {\n"
+                + "    handler: {\n"
+                + "      type: \"" + 
StopNodeFailureHandlerConfigurationSchema.TYPE + "\"\n"
+                + "    },\n"
+                + "    dumpThreadsOnFailure: false\n"
+                + "  }\n"
+                + "}";
+    }
+
     @Test
     @Disabled("https://issues.apache.org/jira/browse/IGNITE-25285";)
     void testScaleUpAfterZoneFilterUpdate() throws InterruptedException {
@@ -164,7 +198,6 @@ public class 
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
      * @throws Exception If failed.
      */
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-24513";)
     void testSeveralHaResetsAndSomeNodeRestart() throws Exception {
         for (int i = 1; i < 8; i++) {
             startNode(i, CUSTOM_NODES_CONFIG);
@@ -195,16 +228,9 @@ public class 
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
         // Stop the last node (G)
         stopNode(1);
 
-        // Start one node from phase 1 (A)
-        startNode(4);
-
-        // Start one node from phase 3 (G)
-        startNode(1);
-
-        //  Start one node from phase 2 (E)
-        startNode(2);
+        startNodes(4, 1, 2);
 
-        waitAndAssertStableAssignmentsOfPartitionEqualTo(node0, HA_TABLE_NAME, 
PARTITION_IDS, nodeNames(1, 2, 4));
+        
waitThatAllRebalancesHaveFinishedAndStableAssignmentsEqualsToExpected(node0, 
HA_TABLE_NAME, PARTITION_IDS,  nodeNames(1, 2, 4));
 
         // Verify that no data is lost and reads from partition on nodes A and 
E are consistent with node G
         assertValuesPresentOnNodes(node0.clock().now(), table, 1, 2, 4);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 70b66098462..d7ab9cf511f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -1345,7 +1345,7 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
         return zoneDescriptor;
     }
 
-    private static ByteArray zoneRecoveryTriggerRevisionKey(int zoneId) {
+    static ByteArray zoneRecoveryTriggerRevisionKey(int zoneId) {
         return new ByteArray(RECOVERY_TRIGGER_REVISION_KEY_PREFIX + zoneId);
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
index 90f54e63bcd..42aec0b81bf 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
@@ -37,6 +37,7 @@ import static 
org.apache.ignite.internal.partition.replicator.network.disaster.L
 import static 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum.HEALTHY;
 import static 
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
 import static 
org.apache.ignite.internal.partitiondistribution.PendingAssignmentsCalculator.pendingAssignmentsCalculator;
+import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.zoneRecoveryTriggerRevisionKey;
 import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.zoneState;
 import static 
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -75,6 +76,7 @@ import 
org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
 import org.apache.ignite.internal.replicator.PartitionGroupId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
+import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.CollectionUtils;
 import org.jetbrains.annotations.Nullable;
 
@@ -306,7 +308,6 @@ class GroupUpdateRequestHandler {
             if (manualUpdate) {
                 enrichAssignments(partId, aliveDataNodes, partitions, 
replicas, consensusGroupSize, partAssignments);
             }
-
             // We need to recalculate assignments to ensure that we have a 
valid set of nodes with correct roles (peers/learners).
             partAssignments = calculateAssignmentForPartition(
                     
partAssignments.stream().map(Assignment::consistentId).collect(toSet()),
@@ -339,6 +340,15 @@ class GroupUpdateRequestHandler {
                 if (entry != null) {
                     AssignmentsQueue pendingQueue = 
AssignmentsQueue.fromBytes(entry.value());
                     if (pendingQueue != null && !pendingQueue.isEmpty()) {
+                        ByteArray recoveryTriggerRevisionKey = 
zoneRecoveryTriggerRevisionKey(partId.zoneId());
+                        var recoveryTriggerRevisionEntry = 
metaStorageMgr.getLocally(recoveryTriggerRevisionKey, revision);
+                        long reductionRevision = (recoveryTriggerRevisionEntry 
!= null && recoveryTriggerRevisionEntry.value() != null)
+                                ? 
ByteUtils.bytesToLongKeepingOrder(recoveryTriggerRevisionEntry.value())
+                                : -1L;
+                        if (entry.revision() > reductionRevision
+                                && 
pendingQueueIsViableForRecovery(pendingQueue, aliveNodesConsistentIds, 
replicas)) {
+                            return 
completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
+                        }
                         AssignmentsQueue filteredPendingQueue = 
filterAliveNodesOnly(pendingQueue, aliveNodesConsistentIds);
                         assignmentsQueue = new 
AssignmentsQueue(assignmentsQueue, filteredPendingQueue);
                     }
@@ -357,6 +367,27 @@ class GroupUpdateRequestHandler {
         });
     }
 
+    private static boolean pendingQueueIsViableForRecovery(AssignmentsQueue 
queue, Set<String> aliveNodesConsistentIds, int replicas) {
+        // Lets assume we have nodes A, B, C, D, E.
+        // C, D, E restart.
+        // Reset timeout triggers.
+        // Node C, D, E get back online to logical topology and create 
pending=[A, B, C, D, E].
+        // Reset proceeds and sees A, B, C, D, E online at its revision.
+        // Then it overwrites existing pending from above with pending=[A]. 
planned=[A,B].
+        // To prevent this scenario we should skip such reset.
+
+        for (Assignments assignments : queue) {
+            if (assignments
+                    .nodes()
+                    .stream()
+                    .map(Assignment::consistentId).anyMatch(name -> 
!aliveNodesConsistentIds.contains(name))
+            ) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     private static AssignmentsQueue filterAliveNodesOnly(AssignmentsQueue 
queue, Set<String> aliveNodesConsistentIds) {
         List<Assignments> filteredAssignments = new ArrayList<>();
 
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 9d2d295e3d8..68567108c5b 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -811,7 +811,7 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
         waitForScale(node0, 2);
 
         DisasterRecoveryManager disasterRecoveryManager = 
node0.disasterRecoveryManager();
-        CompletableFuture<?> updateFuture = 
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1);
+        CompletableFuture<?> updateFuture = 
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1000);
 
         assertThat(updateFuture, willSucceedIn(60, SECONDS));
 
@@ -859,7 +859,7 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
         stopNodesInParallel(1);
 
         DisasterRecoveryManager disasterRecoveryManager = 
node0.disasterRecoveryManager();
-        CompletableFuture<?> updateFuture = 
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1);
+        CompletableFuture<?> updateFuture = 
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1000);
 
         assertThat(updateFuture, willCompleteSuccessfully());
 
@@ -932,7 +932,7 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
         blockRebalanceStableSwitch(partId, assignmentToBlock);
 
         DisasterRecoveryManager disasterRecoveryManager = 
node0.disasterRecoveryManager();
-        CompletableFuture<Void> resetFuture = 
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1);
+        CompletableFuture<Void> resetFuture = 
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1000);
         assertThat(resetFuture, willCompleteSuccessfully());
 
         // force == true, fromReset == false.
@@ -1215,7 +1215,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         blockRebalanceStableSwitch(partId, assignmentsToBlock);
 
         DisasterRecoveryManager disasterRecoveryManager = 
node0.disasterRecoveryManager();
-        CompletableFuture<?> updateFuture = 
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1);
+        CompletableFuture<?> updateFuture = 
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1000);
         assertThat(updateFuture, willCompleteSuccessfully());
 
         // It's important to unblock appendEntries requests after 
resetPartitions, otherwise 2 or even all 3 nodes may align by data/index

Reply via email to